Merge branch 'develop' into handle-exception-fix
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
index cd60af1..b1c6765 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
@@ -1,4 +1,5 @@
 public abstract class kotlinx/coroutines/experimental/AbstractCoroutine : kotlin/coroutines/experimental/Continuation, kotlinx/coroutines/experimental/CoroutineScope, kotlinx/coroutines/experimental/Job {
+	protected final field parentContext Lkotlin/coroutines/experimental/CoroutineContext;
 	public fun <init> (Lkotlin/coroutines/experimental/CoroutineContext;Z)V
 	public synthetic fun <init> (Lkotlin/coroutines/experimental/CoroutineContext;ZILkotlin/jvm/internal/DefaultConstructorMarker;)V
 	public final fun getContext ()Lkotlin/coroutines/experimental/CoroutineContext;
@@ -392,7 +393,7 @@
 
 public abstract interface class kotlinx/coroutines/experimental/Job : kotlin/coroutines/experimental/CoroutineContext$Element {
 	public static final field Key Lkotlinx/coroutines/experimental/Job$Key;
-	public abstract fun attachChild (Lkotlinx/coroutines/experimental/Job;)Lkotlinx/coroutines/experimental/DisposableHandle;
+	public abstract fun attachChild (Lkotlinx/coroutines/experimental/ChildJob;)Lkotlinx/coroutines/experimental/ChildHandle;
 	public abstract fun cancel ()Z
 	public abstract fun cancel (Ljava/lang/Throwable;)Z
 	public abstract synthetic fun cancelChildren (Ljava/lang/Throwable;)V
@@ -467,7 +468,7 @@
 
 public final class kotlinx/coroutines/experimental/NonCancellable : kotlin/coroutines/experimental/AbstractCoroutineContextElement, kotlinx/coroutines/experimental/Job {
 	public static final field INSTANCE Lkotlinx/coroutines/experimental/NonCancellable;
-	public fun attachChild (Lkotlinx/coroutines/experimental/Job;)Lkotlinx/coroutines/experimental/DisposableHandle;
+	public fun attachChild (Lkotlinx/coroutines/experimental/ChildJob;)Lkotlinx/coroutines/experimental/ChildHandle;
 	public fun cancel ()Z
 	public fun cancel (Ljava/lang/Throwable;)Z
 	public synthetic fun cancelChildren (Ljava/lang/Throwable;)V
@@ -487,8 +488,9 @@
 	public fun start ()Z
 }
 
-public final class kotlinx/coroutines/experimental/NonDisposableHandle : kotlinx/coroutines/experimental/DisposableHandle {
+public final class kotlinx/coroutines/experimental/NonDisposableHandle : kotlinx/coroutines/experimental/ChildHandle, kotlinx/coroutines/experimental/DisposableHandle {
 	public static final field INSTANCE Lkotlinx/coroutines/experimental/NonDisposableHandle;
+	public fun childCancelled (Ljava/lang/Throwable;)Z
 	public fun dispose ()V
 	public fun toString ()Ljava/lang/String;
 }
diff --git a/common/kotlinx-coroutines-core-common/src/AbstractCoroutine.kt b/common/kotlinx-coroutines-core-common/src/AbstractCoroutine.kt
index f674a5e..020af59 100644
--- a/common/kotlinx-coroutines-core-common/src/AbstractCoroutine.kt
+++ b/common/kotlinx-coroutines-core-common/src/AbstractCoroutine.kt
@@ -5,6 +5,7 @@
 package kotlinx.coroutines.experimental
 
 import kotlinx.coroutines.experimental.CoroutineStart.*
+import kotlinx.coroutines.experimental.internal.*
 import kotlinx.coroutines.experimental.intrinsics.*
 import kotlin.coroutines.experimental.*
 
@@ -14,12 +15,12 @@
  * This class implements completion [Continuation], [Job], and [CoroutineScope] interfaces.
  * It stores the result of continuation in the state of the job.
  * This coroutine waits for children coroutines to finish before completing and
- * is cancelled through an intermediate _cancelling_ state.
+ * fails through an intermediate _failing_ state.
  *
  * The following methods are available for override:
  *
  * * [onStart] is invoked when coroutine is create in not active state and is [started][Job.start].
- * * [onCancellation] is invoked as soon as coroutine is [cancelled][cancel] (becomes _cancelling_)
+ * * [onCancellation] is invoked as soon as coroutine is _failing_, or is cancelled,
  *   or when it completes for any reason.
  * * [onCompleted] is invoked when coroutine completes with a value.
  * * [onCompletedExceptionally] in invoked when coroutines completes with exception.
@@ -33,12 +34,22 @@
 @Suppress("EXPOSED_SUPER_CLASS")
 @InternalCoroutinesApi
 public abstract class AbstractCoroutine<in T>(
-    private val parentContext: CoroutineContext,
+    /**
+     * Context of the parent coroutine.
+     */
+    @JvmField
+    protected val parentContext: CoroutineContext,
     active: Boolean = true
 ) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
+    /**
+     * Context of this coroutine that includes this coroutine as a [Job].
+     */
     @Suppress("LeakingThis")
     public final override val context: CoroutineContext = parentContext + this
-    @Deprecated("Replaced with context", replaceWith = ReplaceWith("context"))
+
+    /**
+     * Context of this scope which is the same as the [context] of this coroutine.
+     */
     public override val coroutineContext: CoroutineContext get() = context
 
     override val isActive: Boolean get() = super<JobSupport>.isActive
@@ -66,20 +77,16 @@
     }
 
     /**
-     * This function is invoked once when this coroutine is cancelled or is completed,
+     * This function is invoked once when this coroutine is cancelled
      * similarly to [invokeOnCompletion] with `onCancelling` set to `true`.
      *
      * The meaning of [cause] parameter:
      * * Cause is `null` when job has completed normally.
      * * Cause is an instance of [CancellationException] when job was cancelled _normally_.
      *   **It should not be treated as an error**. In particular, it should not be reported to error logs.
-     * * Otherwise, the job had _failed_.
+     * * Otherwise, the job had been cancelled or failed with exception.
      */
-    protected open fun onCancellation(cause: Throwable?) {}
-
-    internal override fun onCancellationInternal(exceptionally: CompletedExceptionally?) {
-        onCancellation(exceptionally?.cause)
-    }
+    protected override fun onCancellation(cause: Throwable?) {}
 
     /**
      * This function is invoked once when job is completed normally with the specified [value].
@@ -89,10 +96,11 @@
     /**
      * This function is invoked once when job is completed exceptionally with the specified [exception].
      */
+    // todo: rename to onCancelled
     protected open fun onCompletedExceptionally(exception: Throwable) {}
 
     @Suppress("UNCHECKED_CAST")
-    internal override fun onCompletionInternal(state: Any?, mode: Int) {
+    internal override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
         if (state is CompletedExceptionally)
             onCompletedExceptionally(state.cause)
         else
diff --git a/common/kotlinx-coroutines-core-common/src/Builders.common.kt b/common/kotlinx-coroutines-core-common/src/Builders.common.kt
index 48d2d27..3a0e981 100644
--- a/common/kotlinx-coroutines-core-common/src/Builders.common.kt
+++ b/common/kotlinx-coroutines-core-common/src/Builders.common.kt
@@ -281,20 +281,11 @@
 // --------------- implementation ---------------
 
 private open class StandaloneCoroutine(
-    private val parentContext: CoroutineContext,
+    parentContext: CoroutineContext,
     active: Boolean
 ) : AbstractCoroutine<Unit>(parentContext, active) {
-    override fun hasOnFinishingHandler(update: Any?) = update is CompletedExceptionally
-
-    override fun handleJobException(exception: Throwable) {
-        handleCoroutineException(parentContext, exception, this)
-    }
-
-    override fun onFinishingInternal(update: Any?) {
-        if (update is CompletedExceptionally && update.cause !is CancellationException) {
-            parentContext[Job]?.cancel(update.cause)
-        }
-    }
+    override val cancelsParent: Boolean get() = true
+    override fun handleJobException(exception: Throwable) = handleExceptionViaHandler(parentContext, exception)
 }
 
 private class LazyStandaloneCoroutine(
diff --git a/common/kotlinx-coroutines-core-common/src/CompletableDeferred.kt b/common/kotlinx-coroutines-core-common/src/CompletableDeferred.kt
index d85b784..ba87286 100644
--- a/common/kotlinx-coroutines-core-common/src/CompletableDeferred.kt
+++ b/common/kotlinx-coroutines-core-common/src/CompletableDeferred.kt
@@ -7,10 +7,11 @@
 import kotlinx.coroutines.experimental.selects.*
 
 /**
- * A [Deferred] that can be completed via public functions
- * [complete], [completeExceptionally], and [cancel].
+ * A [Deferred] that can be completed via public functions [complete] or [cancel][Job.cancel].
  *
- * Completion functions return `false` when this deferred value is already complete or completing.
+ * Note, that [complete] functions returns `false` when this deferred value is already complete or completing,
+ * while [cancel][Job.cancel] returns `true` as long the deferred is still _cancelling_ and the corresponding
+ * exception is incorporated into the final [completion exception][getCompletionExceptionOrNull].
  *
  * An instance of completable deferred can be created by `CompletableDeferred()` function in _active_ state.
  *
@@ -32,6 +33,7 @@
      *
      * Repeated invocations of this function have no effect and always produce `false`.
      */
+    @Deprecated(message = "Use cancel", replaceWith = ReplaceWith("cancel(exception)"))
     public fun completeExceptionally(exception: Throwable): Boolean
 }
 
@@ -61,7 +63,7 @@
     parent: Job?
 ) : JobSupport(true), CompletableDeferred<T>, SelectClause1<T> {
     init { initParentJobInternal(parent) }
-    override val onCancelMode: Int get() = ON_CANCEL_MAKE_COMPLETING
+    override val onCancelComplete get() = true
     override fun getCompleted(): T = getCompletedInternal() as T
     override suspend fun await(): T = awaitInternal() as T
     override val onAwait: SelectClause1<T> get() = this
diff --git a/common/kotlinx-coroutines-core-common/src/CompletedExceptionally.kt b/common/kotlinx-coroutines-core-common/src/CompletedExceptionally.kt
index 6c564f2..da3825c 100644
--- a/common/kotlinx-coroutines-core-common/src/CompletedExceptionally.kt
+++ b/common/kotlinx-coroutines-core-common/src/CompletedExceptionally.kt
@@ -8,15 +8,16 @@
 import kotlin.coroutines.experimental.*
 
 /**
- * Class for an internal state of a job that had completed exceptionally, including cancellation.
+ * Class for an internal state of a job that was cancelled (completed exceptionally).
  *
  * **Note: This class cannot be used outside of internal coroutines framework**.
- * **Note: cannot be internal until we get rid of MutableDelegateContinuation in IO**
+ * **Note: cannot be internal and renamed until we get rid of MutableDelegateContinuation in IO**
  *
  * @param cause the exceptional completion cause. It's either original exceptional cause
  *        or artificial [CancellationException] if no cause was provided
  * @suppress **This is unstable API and it is subject to change.**
  */
+// todo: rename to Cancelled
 open class CompletedExceptionally(
     @JvmField public val cause: Throwable
 ) {
@@ -24,20 +25,6 @@
 }
 
 /**
- * A specific subclass of [CompletedExceptionally] for cancelled jobs.
- *
- * **Note: This class cannot be used outside of internal coroutines framework**.
- *
- * @param job the job that was cancelled.
- * @param cause the exceptional completion cause. If `cause` is null, then a [CancellationException] is created.
- * @suppress **This is unstable API and it is subject to change.**
- */
-internal class Cancelled(
-    job: Job,
-    cause: Throwable?
-) : CompletedExceptionally(cause ?: JobCancellationException("Job was cancelled normally", null, job))
-
-/**
  * A specific subclass of [CompletedExceptionally] for cancelled [AbstractContinuation].
  *
  * **Note: This class cannot be used outside of internal coroutines framework**.
diff --git a/common/kotlinx-coroutines-core-common/src/CompletionHandler.common.kt b/common/kotlinx-coroutines-core-common/src/CompletionHandler.common.kt
index c92634f..89f6a97 100644
--- a/common/kotlinx-coroutines-core-common/src/CompletionHandler.common.kt
+++ b/common/kotlinx-coroutines-core-common/src/CompletionHandler.common.kt
@@ -43,3 +43,5 @@
 // :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
 // because we play type tricks on Kotlin/JS and handler is not necessarily a function there
 internal expect fun CompletionHandler.invokeIt(cause: Throwable?)
+
+internal inline fun <reified T> CompletionHandler.isHandlerOf(): Boolean = this is T
diff --git a/common/kotlinx-coroutines-core-common/src/CoroutineExceptionHandler.kt b/common/kotlinx-coroutines-core-common/src/CoroutineExceptionHandler.kt
index 088cc69..2450039 100644
--- a/common/kotlinx-coroutines-core-common/src/CoroutineExceptionHandler.kt
+++ b/common/kotlinx-coroutines-core-common/src/CoroutineExceptionHandler.kt
@@ -18,38 +18,42 @@
  *
  * If there is a [Job] in the context and it's not a [caller], then [Job.cancel] is invoked.
  * If invocation returned `true`, method terminates: now [Job] is responsible for handling an exception.
- * Otherwise, If there is [CoroutineExceptionHandler] in the context, it is used.
- * Otherwise all instances of [CoroutineExceptionHandler] found via [ServiceLoader] and [Thread.uncaughtExceptionHandler] are invoked
+ * Otherwise, If there is [CoroutineExceptionHandler] in the context, it is used. If it throws an exception during handling
+ * or is absent, all instances of [CoroutineExceptionHandler] found via [ServiceLoader] and [Thread.uncaughtExceptionHandler] are invoked
+ * todo: Deprecate/hide this function.
  */
 @JvmOverloads // binary compatibility
 @InternalCoroutinesApi
 public fun handleCoroutineException(context: CoroutineContext, exception: Throwable, caller: Job? = null) {
-    // if exception handling fails, make sure the original exception is not lost
+    // Ignore CancellationException (they are normal ways to terminate a coroutine)
+    if (exception is CancellationException) return // nothing to do
+    // Try propagate exception to parent
+    val job = context[Job]
+    if (job !== null && job !== caller && job.cancel(exception)) return // handle by parent
+    // otherwise -- use exception handlers
+    handleExceptionViaHandler(context, exception)
+}
+
+internal fun handleExceptionViaHandler(context: CoroutineContext, exception: Throwable) {
+    // Invoke exception handler from the context if present
     try {
-        // Ignore CancellationException (they are normal ways to terminate a coroutine)
-        if (exception is CancellationException) {
-            return
-        }
-        // If parent is successfully cancelled, we're done, it is now its responsibility to handle the exception
-        val parent = context[Job]
-        // E.g. actor registers itself in the context, in that case we should invoke handler
-        if (parent !== null && parent !== caller && parent.cancel(exception)) {
-            return
-        }
-        // If not, invoke exception handler from the context
         context[CoroutineExceptionHandler]?.let {
             it.handleException(context, exception)
             return
         }
-        // If handler is not present in the context, fallback to the global handler
-        handleCoroutineExceptionImpl(context, exception)
-    } catch (handlerException: Throwable) {
-        // simply rethrow if handler threw the original exception
-        if (handlerException === exception) throw exception
-        // handler itself crashed for some other reason -- that is bad -- keep both
-        throw RuntimeException("Exception while trying to handle coroutine exception", exception).apply {
-            addSuppressedThrowable(handlerException)
-        }
+    } catch (t: Throwable) {
+        handleCoroutineExceptionImpl(context, handlerException(exception, t))
+        return
+    }
+
+    // If handler is not present in the context or exception was thrown, fallback to the global handler
+    handleCoroutineExceptionImpl(context, exception)
+}
+
+internal fun handlerException(originalException: Throwable, thrownException: Throwable): Throwable {
+    if (originalException === thrownException) return originalException
+    return RuntimeException("Exception while trying to handle coroutine exception", thrownException).apply {
+        addSuppressedThrowable(originalException)
     }
 }
 
diff --git a/common/kotlinx-coroutines-core-common/src/CoroutineScope.kt b/common/kotlinx-coroutines-core-common/src/CoroutineScope.kt
index 6e1af6d..3935296 100644
--- a/common/kotlinx-coroutines-core-common/src/CoroutineScope.kt
+++ b/common/kotlinx-coroutines-core-common/src/CoroutineScope.kt
@@ -71,7 +71,7 @@
         get() = coroutineContext[Job]?.isActive ?: true
 
     /**
-     * Returns the context of this scope.
+     * Context of this scope.
      */
     public val coroutineContext: CoroutineContext
 }
@@ -175,7 +175,7 @@
  */
 public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R {
     // todo: optimize implementation to a single allocated object
-    val owner = ScopeOwnerCoroutine<R>(coroutineContext)
+    val owner = ScopeCoroutine<R>(coroutineContext)
     owner.start(CoroutineStart.UNDISPATCHED, owner, block)
     owner.join()
     if (owner.isCancelled) {
diff --git a/common/kotlinx-coroutines-core-common/src/Deferred.kt b/common/kotlinx-coroutines-core-common/src/Deferred.kt
index 964196b..b4681a5 100644
--- a/common/kotlinx-coroutines-core-common/src/Deferred.kt
+++ b/common/kotlinx-coroutines-core-common/src/Deferred.kt
@@ -9,75 +9,44 @@
 import kotlin.coroutines.experimental.*
 
 /**
- * Deferred value is a non-blocking cancellable future.
+ * Deferred value is a non-blocking cancellable future &mdash; it is a [Job] that has a result.
  *
  * It is created with [async][CoroutineScope.async] coroutine builder or via constructor of [CompletableDeferred] class.
  * It is in [active][isActive] state while the value is being computed.
  *
- * Deferred value has the following states:
- *
- * | **State**                               | [isActive] | [isCompleted] | [isCompletedExceptionally] | [isCancelled] |
- * | --------------------------------------- | ---------- | ------------- | -------------------------- | ------------- |
- * | _New_ (optional initial state)          | `false`    | `false`       | `false`                    | `false`       |
- * | _Active_ (default initial state)        | `true`     | `false`       | `false`                    | `false`       |
- * | _Completing_ (optional transient state) | `true`     | `false`       | `false`                    | `false`       |
- * | _Cancelling_ (optional transient state) | `false`    | `false`       | `false`                    | `true`        |
- * | _Cancelled_ (final state)               | `false`    | `true`        | `true`                     | `true`        |
- * | _Resolved_  (final state)               | `false`    | `true`        | `false`                    | `false`       |
- * | _Failed_    (final state)               | `false`    | `true`        | `true`                     | `false`       |
+ * Deferred value has the same state machine as the [Job] with additional convenience methods to retrieve
+ * successful or failed result of the computation that was carried out. The result of the deferred is
+ * available when it is [completed][isCompleted] and can be retrieved by [await] method, which throws
+ * exception if the deferred had failed.
+ * Note, that a _cancelled_ deferred is also considered to be completed.
+ * The corresponding exception can be retrieved via [getCompletionExceptionOrNull] from a completed instance of deferred.
  *
  * Usually, a deferred value is created in _active_ state (it is created and started).
  * However, [async][CoroutineScope.async] coroutine builder has an optional `start` parameter that creates a deferred value in _new_ state
  * when this parameter is set to [CoroutineStart.LAZY].
  * Such a deferred can be be made _active_ by invoking [start], [join], or [await].
  *
- * A deferred can be _cancelled_ at any time with [cancel] function that forces it to transition to
- * _cancelling_ state immediately. Deferred that is not backed by a coroutine (see [CompletableDeferred]) and does not have
- * [children] becomes _cancelled_ on [cancel] immediately.
- * Otherwise, deferred becomes _cancelled_  when it finishes executing its code and
- * when all its children [complete][isCompleted].
- *
- * ```
- *                                                     wait children
- *    +-----+       start      +--------+   complete  +-------------+ finish +-----------+
- *    | New | ---------------> | Active | ----------> | Completing  | ---+-> | Resolved  |
- *    +-----+                  +--------+             +-------------+    |   |(completed)|
- *       |                         |                        |            |   +-----------+
- *       | cancel                  | cancel                 | cancel     |
- *       V                         V                        |            |   +-----------+
- *  +-----------+   finish   +------------+                 |            +-> |  Failed   |
- *  | Cancelled | <--------- | Cancelling | <---------------+                |(completed)|
- *  |(completed)|            +------------+                                  +-----------+
- *  +-----------+
- * ```
- *
  * A deferred value is a [Job]. A job in the
  * [coroutineContext](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines.experimental/coroutine-context.html)
  * of [async][CoroutineScope.async] builder represents the coroutine itself.
- * A deferred value is active while the coroutine is working and cancellation aborts the coroutine when
- * the coroutine is suspended on a _cancellable_ suspension point by throwing [CancellationException]
- * or the cancellation cause inside the coroutine.
- *
- * A deferred value can have a _parent_ job. A deferred value with a parent is cancelled when its parent is
- * cancelled or completes. Parent waits for all its [children] to complete in _completing_ or
- * _cancelling_ state. _Completing_ state is purely internal. For an outside observer a _completing_
- * deferred is still active, while internally it is waiting for its children.
  *
  * All functions on this interface and on all interfaces derived from it are **thread-safe** and can
  * be safely invoked from concurrent coroutines without external synchronization.
  */
 public interface Deferred<out T> : Job {
     /**
-     * Returns `true` if computation of this deferred value has _completed exceptionally_ -- it had
-     * either _failed_ with exception during computation or was [cancelled][cancel].
+     * Returns `true` if computation of this deferred value has _completed exceptionally_.
+     * It is `true` when both [isCompleted] and [isCancelled] are true.
+     * It implies that [isActive] is `false`.
      *
-     * It implies that [isActive] is `false` and [isCompleted] is `true`.
+     * @suppress **Deprecated**: Use [isCancelled] && [isCompleted]
      */
+    @Deprecated("Use isCancelled && isCompleted", ReplaceWith("this.isCancelled && this.isCompleted"))
     public val isCompletedExceptionally: Boolean
 
     /**
      * Awaits for completion of this value without blocking a thread and resumes when deferred computation is complete,
-     * returning the resulting value or throwing the corresponding exception if the deferred had completed exceptionally or was cancelled.
+     * returning the resulting value or throwing the corresponding exception if the deferred was cancelled.
      *
      * This suspending function is cancellable.
      * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
@@ -97,8 +66,7 @@
 
     /**
      * Returns *completed* result or throws [IllegalStateException] if this deferred value has not
-     * [completed][isCompleted] yet. It throws the corresponding exception if this deferred has
-     * [completed exceptionally][isCompletedExceptionally].
+     * [completed][isCompleted] yet. It throws the corresponding exception if this deferred was [cancelled][isCancelled].
      *
      * This function is designed to be used from [invokeOnCompletion] handlers, when there is an absolute certainty that
      * the value is already complete. See also [getCompletionExceptionOrNull].
@@ -109,8 +77,8 @@
     public fun getCompleted(): T
 
     /**
-     * Returns *completion exception* result if this deferred [completed exceptionally][isCompletedExceptionally],
-     * `null` if it is completed normally, or throws [IllegalStateException] if this deferred value has not
+     * Returns *completion exception* result if this deferred was [cancelled][isCancelled] and has [completed][isCompleted],
+     * `null` if it had completed normally, or throws [IllegalStateException] if this deferred value has not
      * [completed][isCompleted] yet.
      *
      * This function is designed to be used from [invokeOnCompletion] handlers, when there is an absolute certainty that
diff --git a/common/kotlinx-coroutines-core-common/src/Job.kt b/common/kotlinx-coroutines-core-common/src/Job.kt
index 374b87e..3713c07 100644
--- a/common/kotlinx-coroutines-core-common/src/Job.kt
+++ b/common/kotlinx-coroutines-core-common/src/Job.kt
@@ -14,59 +14,56 @@
 // --------------- core job interfaces ---------------
 
 /**
- * A background job. Conceptually, a job is a cancellable thing with a simple life-cycle that
+ * A background job. Conceptually, a job is a cancellable thing with a life-cycle that
  * culminates in its completion. Jobs can be arranged into parent-child hierarchies where cancellation
- * or completion of parent immediately cancels all its [children].
+ * of parent lead to an immediate cancellation of all its [children] and vice versa.
  *
  * The most basic instances of [Job] are created with [launch][CoroutineScope.launch] coroutine builder or with a
- * `Job()` factory function.  Other coroutine builders and primitives like
- * [Deferred] also implement [Job] interface.
+ * `Job()` factory function.
+ * Conceptually, an execution of the job does not produce a result value. Jobs are launched solely for their
+ * side-effects. See [Deferred] interface for a job that produces a result.
  *
  * A job has the following states:
  *
- * | **State**                               | [isActive] | [isCompleted] | [isCancelled] |
- * | --------------------------------------- | ---------- | ------------- | ------------- |
- * | _New_ (optional initial state)          | `false`    | `false`       | `false`       |
- * | _Active_ (default initial state)        | `true`     | `false`       | `false`       |
- * | _Completing_ (optional transient state) | `true`     | `false`       | `false`       |
- * | _Cancelling_ (optional transient state) | `false`    | `false`       | `true`        |
- * | _Cancelled_ (final state)               | `false`    | `true`        | `true`        |
- * | _Completed_ (final state)               | `false`    | `true`        | `false`       |
+ * | **State**                        | [isActive] | [isCompleted] | [isCancelled] |
+ * | -------------------------------- | ---------- | ------------- | ------------- |
+ * | _New_ (optional initial state)   | `false`    | `false`       | `false`       |
+ * | _Active_ (default initial state) | `true`     | `false`       | `false`       |
+ * | _Completing_ (transient state)   | `true`     | `false`       | `false`       |
+ * | _Cancelling_ (transient state)   | `false`    | `false`       | `true`        |
+ * | _Cancelled_ (final state)        | `false`    | `true`        | `true`        |
+ * | _Completed_ (final state)        | `false`    | `true`        | `false`       |
  *
  * Usually, a job is created in _active_ state (it is created and started). However, coroutine builders
  * that provide an optional `start` parameter create a coroutine in _new_ state when this parameter is set to
  * [CoroutineStart.LAZY]. Such a job can be made _active_ by invoking [start] or [join].
  *
- * A job can be _cancelled_ at any time with [cancel] function that forces it to transition to
- * _cancelling_ state immediately. Job that is not backed by a coroutine (see `Job()` function) and does not have
- * [children] becomes _cancelled_ on [cancel] immediately.
- * Otherwise, job becomes _cancelled_  when it finishes executing its code and
- * when all its children [complete][isCompleted].
+ * A job is _active_ while the coroutine is working. Failure of the job makes it _cancelling_.
+ * A job can be cancelled it at any time with [cancel] function that forces it to transition to
+ * _cancelling_ state immediately. The job becomes _cancelled_  when it finishes executing it work.
  *
  * ```
- *                                                      wait children
- *    +-----+       start      +--------+   complete   +-------------+  finish  +-----------+
- *    | New | ---------------> | Active | -----------> | Completing  | -------> | Completed |
- *    +-----+                  +--------+              +-------------+          +-----------+
- *       |                         |                         |
- *       | cancel                  | cancel                  | cancel
- *       V                         V                         |
- *  +-----------+   finish   +------------+                  |
- *  | Cancelled | <--------- | Cancelling | <----------------+
- *  |(completed)|            +------------+
- *  +-----------+
+ *                                       wait children
+ * +-----+ start  +--------+ complete   +-------------+  finish  +-----------+
+ * | New | -----> | Active | ---------> | Completing  | -------> | Completed |
+ * +-----+        +--------+            +-------------+          +-----------+
+ *                  |        cancel        |
+ *                  |     +----------------+
+ *                  |     |
+ *                  V     V
+ *              +------------+                           finish  +-----------+
+ *              | Cancelling | --------------------------------> | Cancelled |
+ *              +------------+                                   +-----------+
  * ```
  *
- * A job in the
+ * A `Job` instance in the
  * [coroutineContext](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines.experimental/coroutine-context.html)
  * represents the coroutine itself.
- * A job is active while the coroutine is working and job's cancellation aborts the coroutine when
- * the coroutine is suspended on a _cancellable_ suspension point by throwing [CancellationException].
  *
- * A job can have a _parent_ job. A job with a parent is cancelled when its parent is cancelled or completes exceptionally.
- * Parent job waits for all its children to complete in _completing_ or _cancelling_ state.
- * _Completing_ state is purely internal to the job. For an outside observer a _completing_ job is still active,
- * while internally it is waiting for its children.
+ * A job can have a _parent_ job. A job with a parent is cancelled when its parent is cancelled.
+ * Parent job waits in _completing_ or _cancelling_ state for all its children to complete before finishing.
+ * Note, that _completing_ state is purely internal to the job. For an outside observer a _completing_ job is still
+ * active, while internally it is waiting for its children.
  *
  * All functions on this interface and on all interfaces derived from it are **thread-safe** and can
  * be safely invoked from concurrent coroutines without external synchronization.
@@ -97,22 +94,25 @@
     // ------------ state query ------------
 
     /**
-     * Returns `true` when this job is active -- it was already started and has not completed or cancelled yet.
+     * Returns `true` when this job is active -- it was already started and has not completed or failed yet.
      * The job that is waiting for its [children] to complete is still considered to be active if it
-     * was not cancelled.
+     * has not failed and was not cancelled.
      */
     public val isActive: Boolean
 
     /**
-     * Returns `true` when this job has completed for any reason. A job that was cancelled and has
-     * finished its execution is also considered complete. Job becomes complete only after
+     * Returns `true` when this job has completed for any reason. A job that has failed or cancelled
+     * and has finished its execution is also considered complete. Job becomes complete only after
      * all its [children] complete.
      */
     public val isCompleted: Boolean
 
     /**
-     * Returns `true` if this job was [cancelled][cancel]. In the general case, it does not imply that the
-     * job has already [completed][isCompleted] (it may still be cancelling whatever it was doing).
+     * Returns `true` if this job was cancelled for any reason, either by explicit invocation of [cancel] or
+     * because it had failed or its children or parent was cancelled.
+     * In the general case, it does not imply that the
+     * job has already [completed][isCompleted], because it may still be finishing whatever it was doing and
+     * waiting for its [children] to complete.
      */
     public val isCancelled: Boolean
 
@@ -127,8 +127,7 @@
      * returned. The [CancellationException.cause] of the resulting [CancellationException] references
      * the original cancellation cause that was passed to [cancel] function.
      *
-     * This function throws [IllegalStateException] when invoked on a job that has not
-     * [completed][isCompleted] nor [cancelled][isCancelled] yet.
+     * This function throws [IllegalStateException] when invoked on a job that is still active.
      *
      * @suppress **This an internal API and should not be used from general code.**
      */
@@ -204,21 +203,23 @@
      *
      * A parent-child relation has the following effect:
      * * Cancellation of parent with [cancel] or its exceptional completion (failure)
-     *   immediately cancels all its children.
+     *   immediately fails all its children.
      * * Parent cannot complete until all its children are complete. Parent waits for all its children to
-     *   complete in _completing_ or _cancelling_ state.
+     *   complete in _completing_ or _cancelling_ states.
      *
-     * **A child must store the resulting [DisposableHandle] and [dispose][DisposableHandle.dispose] the attachment
+     * **A child must store the resulting [ChildHandle] and [dispose][DisposableHandle.dispose] the attachment
      * to its parent on its own completion.**
      *
      * Coroutine builders and job factory functions that accept `parent` [CoroutineContext] parameter
      * lookup a [Job] instance in the parent context and use this function to attach themselves as a child.
-     * They also store a reference to the resulting [DisposableHandle] and dispose a handle when they complete.
+     * They also store a reference to the resulting [ChildHandle] and dispose a handle when they complete.
      *
      * @suppress This is an internal API. This method is too error prone for public API.
      */
+    // ChildJob and ChildHandle are made internal on purpose to further deter 3rd-party impl of Job
     @InternalCoroutinesApi
-    public fun attachChild(child: Job): DisposableHandle
+    @Suppress("EXPOSED_FUNCTION_RETURN_TYPE", "EXPOSED_PARAMETER_TYPE")
+    public fun attachChild(child: ChildJob): ChildHandle
 
     /**
      * Cancels all children jobs of this coroutine with the given [cause]. Unlike [cancel],
@@ -268,10 +269,10 @@
     public fun invokeOnCompletion(handler: CompletionHandler, onCancelling: Boolean): DisposableHandle
 
     /**
-     * @suppress **Deprecated**: Use with named `onCancelling` and `handler` parameters.
+     * @suppress **Deprecated**: Use with named `onCancellation` and `handler` parameters.
      */
-    @Deprecated(message = "Use with named `onCancelling` and `handler` parameters", level = DeprecationLevel.WARNING,
-        replaceWith = ReplaceWith("this.invokeOnCompletion(onCancelling = onCancelling_, handler = handler)"))
+    @Deprecated(message = "Use with named `onCancellation` and `handler` parameters", level = DeprecationLevel.WARNING,
+        replaceWith = ReplaceWith("this.invokeOnCompletion(onCancellation = onCancelling_, handler = handler)"))
     public fun invokeOnCompletion(onCancelling_: Boolean = false, handler: CompletionHandler): DisposableHandle
 
     /**
@@ -299,10 +300,10 @@
     public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
 
     /**
-     * Registers handler that is **synchronously** invoked once on cancellation or completion of this job.
-     * When job is already cancelling or complete, then the handler is immediately invoked
+     * Registers handler that is **synchronously** invoked once on failure or completion of this job.
+     * When job is already failing or complete, then the handler is immediately invoked
      * with a job's cancellation cause or `null` unless [invokeImmediately] is set to false.
-     * Otherwise, handler will be invoked once when this job is cancelled or complete.
+     * Otherwise, handler will be invoked once when this job is failing or is complete.
      *
      * The meaning of `cause` that is passed to the handler:
      * * Cause is `null` when job has completed normally.
@@ -310,12 +311,9 @@
      *   **It should not be treated as an error**. In particular, it should not be reported to error logs.
      * * Otherwise, the job had _failed_.
      *
-     * Invocation of this handler on a transition to a transient _cancelling_ state
+     * Invocation of this handler on a transition to a _failing_ state
      * is controlled by [onCancelling] boolean parameter.
-     * The handler is invoked on invocation of [cancel] when
-     * job becomes _cancelling_ if [onCancelling] parameter is set to `true`. However,
-     * when this [Job] is not backed by a coroutine, like [CompletableDeferred] or [CancellableContinuation]
-     * (both of which do not posses a _cancelling_ state), then the value of [onCancelling] parameter is ignored.
+     * The handler is invoked when the job is failing when [onCancelling] parameter is set to `true`.
      *
      * The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] the
      * registration of this handler and release its memory if its invocation is no longer needed.
@@ -330,13 +328,13 @@
      * This function should not be used in general application code.
      * Implementations of `CompletionHandler` must be fast and _lock-free_.
      *
-     * @param onCancelling when `true`, then the [handler] is invoked as soon as this job transitions to _cancelling_ state;
+     * @param onCancelling when `true`, then the [handler] is invoked as soon as this job transitions to _failing_ state;
      *        when `false` then the [handler] is invoked only when it transitions to _completed_ state.
      * @param invokeImmediately when `true` and this job is already in the desired state (depending on [onCancelling]),
      *        then the [handler] is immediately and synchronously invoked and no-op [DisposableHandle] is returned;
      *        when `false` then no-op [DisposableHandle] is returned, but the [handler] is not invoked.
      * @param handler the handler.
-     * 
+     *
      * @suppress **This an internal API and should not be used from general code.**
      */
     @InternalCoroutinesApi
@@ -390,6 +388,41 @@
         }
     }
 
+// -------------------- Parent-child communication --------------------
+
+/**
+ * A reference that parent receives from its child so that it can report its failure.
+ *
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+internal interface ChildJob : Job {
+    /**
+     * Parent is reporting failure to the child by invoking this method.
+     * Child finds the failure cause using [getCancellationException] of the [parentJob].
+     * This method does nothing is the child is already failing.
+     *
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    public fun parentCancelled(parentJob: Job)
+}
+
+/**
+ * A handle that child keep onto its parent so that it is able to report its failure.
+ *
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+internal interface ChildHandle : DisposableHandle {
+    /**
+     * Child is reporting failure to the parent by invoking this method.
+     * This method is invoked by the child twice. The first time child report its root cause as soon as possible,
+     * so that all its siblings and the parent can start finishing their work asap on failure. The second time
+     * child invokes this method when it had aggregated and determined its final termination cause.
+     *
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    public fun childCancelled(cause: Throwable): Boolean
+}
+
 // -------------------- Job extensions --------------------
 
 /**
@@ -532,10 +565,22 @@
  * @suppress **This an internal API and should not be used from general code.**
  */
 @InternalCoroutinesApi
-public object NonDisposableHandle : DisposableHandle {
-    /** Does not do anything. */
+public object NonDisposableHandle : DisposableHandle, ChildHandle {
+    /**
+     * Does not do anything.
+     * @suppress
+     */
     override fun dispose() {}
 
-    /** Returns "NonDisposableHandle" string. */
+    /**
+     * Returns `false`.
+     * @suppress
+     */
+    override fun childCancelled(cause: Throwable): Boolean = false
+
+    /**
+     * Returns "NonDisposableHandle" string.
+     * @suppress
+     */
     override fun toString(): String = "NonDisposableHandle"
 }
diff --git a/common/kotlinx-coroutines-core-common/src/JobSupport.kt b/common/kotlinx-coroutines-core-common/src/JobSupport.kt
index 4a7fdab..66924ed 100644
--- a/common/kotlinx-coroutines-core-common/src/JobSupport.kt
+++ b/common/kotlinx-coroutines-core-common/src/JobSupport.kt
@@ -12,7 +12,6 @@
 
 /**
  * A concrete implementation of [Job]. It is optionally a child to a parent job.
- * This job is cancelled when the parent is complete, but not vise-versa.
  *
  * This is an open class designed for extension by more specific classes that might augment the
  * state and mare store addition state information for completed jobs, like their result values.
@@ -20,48 +19,47 @@
  * @param active when `true` the job is created in _active_ state, when `false` in _new_ state. See [Job] for details.
  * @suppress **This is unstable API and it is subject to change.**
  */
-internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0 {
+internal open class JobSupport constructor(active: Boolean) : Job, ChildJob, SelectClause0 {
     final override val key: CoroutineContext.Key<*> get() = Job
 
     /*
        === Internal states ===
 
-       name       state class    public state  description
-       ------     ------------   ------------  -----------
-       EMPTY_N    EmptyNew     : New           no listeners
-       EMPTY_A    EmptyActive  : Active        no listeners
-       SINGLE     JobNode      : Active        a single listener
-       SINGLE+    JobNode      : Active        a single listener + NodeList added as its next
-       LIST_N     NodeList     : New           a list of listeners (promoted once, does not got back to EmptyNew)
-       LIST_A     NodeList     : Active        a list of listeners (promoted once, does not got back to JobNode/EmptyActive)
-       COMPLETING Finishing    : Completing    has a list of listeners (promoted once from LIST_*)
-       CANCELLING Finishing    : Cancelling    has a list of listeners (promoted once from LIST_*)
-       FINAL_C    Cancelled    : Cancelled     cancelled (final state)
-       FINAL_F    Failed       : Completed     failed for other reason (final state)
-       FINAL_R    <any>        : Completed     produced some result
+       name       state class              public state  description
+       ------     ------------             ------------  -----------
+       EMPTY_N    EmptyNew               : New           no listeners
+       EMPTY_A    EmptyActive            : Active        no listeners
+       SINGLE     JobNode                : Active        a single listener
+       SINGLE+    JobNode                : Active        a single listener + NodeList added as its next
+       LIST_N     InactiveNodeList       : New           a list of listeners (promoted once, does not got back to EmptyNew)
+       LIST_A     NodeList               : Active        a list of listeners (promoted once, does not got back to JobNode/EmptyActive)
+       COMPLETING Finishing              : Completing    has a list of listeners (promoted once from LIST_*)
+       CANCELLING Finishing              : Cancelling    -- " --
+       FINAL_C    Cancelled              : Cancelled     Cancelled (final state)
+       FINAL_R    <any>                  : Completed     produced some result
 
        === Transitions ===
 
            New states      Active states       Inactive states
 
           +---------+       +---------+                          }
-          | EMPTY_N | --+-> | EMPTY_A | ----+                    } Empty states
-          +---------+   |   +---------+     |                    }
-               |        |     |     ^       |    +----------+
-               |        |     |     |       +--> |  FINAL_* |
-               |        |     V     |       |    +----------+
-               |        |   +---------+     |                    }
-               |        |   | SINGLE  | ----+                    } JobNode states
-               |        |   +---------+     |                    }
-               |        |        |          |                    }
-               |        |        V          |                    }
-               |        |   +---------+     |                    }
-               |        +-- | SINGLE+ | ----+                    }
+          | EMPTY_N | ----> | EMPTY_A | ----+                    } Empty states
+          +---------+       +---------+     |                    }
+               |  |           |     ^       |    +----------+
+               |  |           |     |       +--> |  FINAL_* |
+               |  |           V     |       |    +----------+
+               |  |         +---------+     |                    }
+               |  |         | SINGLE  | ----+                    } JobNode states
+               |  |         +---------+     |                    }
+               |  |              |          |                    }
+               |  |              V          |                    }
+               |  |         +---------+     |                    }
+               |  +-------> | SINGLE+ | ----+                    }
                |            +---------+     |                    }
                |                 |          |
                V                 V          |
           +---------+       +---------+     |                    }
-          | LIST_N  | ----> | LIST_A  | ----+                    } NodeList states
+          | LIST_N  | ----> | LIST_A  | ----+                    } [Inactive]NodeList states
           +---------+       +---------+     |                    }
              |   |             |   |        |
              |   |    +--------+   |        |
@@ -75,16 +73,57 @@
 
 
        This state machine and its transition matrix are optimized for the common case when job is created in active
-       state (EMPTY_A) and at most one completion listener is added to it during its life-time.
+       state (EMPTY_A), at most one completion listener is added to it during its life-time, and it completes
+       successfully without children (in this case it directly goes from EMPTY_A or SINGLE state to FINAL_R
+       state without going to COMPLETING state)
 
        Note, that the actual `_state` variable can also be a reference to atomic operation descriptor `OpDescriptor`
+
+       ---------- TIMELINE of state changes and notification in Job lifecycle ----------
+
+       | The longest possible chain of events in shown, shorter versions cut-through intermediate states,
+       |  while still performing all the notifications in this order.
+
+         + Job object is created
+       ## NEW: state == EMPTY_ACTIVE | is InactiveNodeList
+         + initParentJob / initParentJobInternal (invokes attachChild on its parent, initializes parentHandle)
+         ~ waits for start
+         >> start / join / await invoked
+       ## ACTIVE: state == EMPTY_ACTIVE | is JobNode | is NodeList
+         + onStartInternal / onStart (lazy coroutine is started)
+         ~ active coroutine is working (or scheduled to execution)
+         >> childCancelled / cancelImpl invoked
+       ## CANCELLING: state is Finishing, state.rootCause != null
+        ------ cancelling listeners are not admitted anymore, invokeOnCompletion(onCancellation=true) returns NonDisposableHandle
+        ------ new children get immediately cancelled, but are still admitted to the list
+         + onCancellation
+         + notifyCancelling (invoke all cancelling listeners -- cancel all children, suspended functions resume with exception)
+         + cancelParent (rootCause of cancellation is communicated to the parent, parent is cancelled, too)
+         ~ waits for completion of coroutine body
+         >> makeCompleting / makeCompletingOnce invoked
+       ## COMPLETING: state is Finishing, state.isCompleting == true
+        ------ new children are not admitted anymore, attachChild returns NonDisposableHandle
+         ~ waits for children
+         >> last child completes
+         - computes the final exception
+       ## SEALED: state is Finishing, state.isSealed == true
+        ------ cancel/childCancelled returns false (cannot handle exceptions anymore)
+         + cancelParent (final exception is communicated to the parent, parent incorporates it)
+         + handleJobException ("launch" StandaloneCoroutine invokes CoroutineExceptionHandler)
+       ## COMPLETE: state !is Incomplete (CompletedExceptionally | Cancelled)
+        ------ completion listeners are not admitted anymore, invokeOnCompletion returns NonDisposableHandle
+         + parentHandle.dispose
+         + notifyCompletion (invoke all completion listeners)
+         + onCompletionInternal / onCompleted / onCompletedExceptionally
+
+       ---------------------------------------------------------------------------------
      */
 
     // Note: use shared objects while we have no listeners
     private val _state = atomic<Any?>(if (active) EMPTY_ACTIVE else EMPTY_NEW)
 
     @Volatile
-    private var parentHandle: DisposableHandle? = null
+    private var parentHandle: ChildHandle? = null
 
     // ------------ initialization ------------
 
@@ -103,7 +142,7 @@
         @Suppress("DEPRECATION")
         val handle = parent.attachChild(this)
         parentHandle = handle
-        // now check our state _after_ registering (see tryFinalizeStateActually order of actions)
+        // now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
         if (isCompleted) {
             handle.dispose()
             parentHandle = NonDisposableHandle // release it just in case, to aid GC
@@ -141,61 +180,55 @@
 
     public final override val isCancelled: Boolean get() {
         val state = this.state
-        return state is Cancelled || (state is Finishing && state.cancelled != null)
+        return state is CompletedExceptionally || (state is Finishing && state.isCancelling)
     }
 
     // ------------ state update ------------
 
-    /**
-     * Updates current [state] of this job to the final state, invoking all necessary handlers
-     * and/or `on*` methods.
-     *
-     * Returns `false` if current state is not equal to expected.
-     * If this method succeeds, state of this job will never be changed again
-     */
-    private fun tryFinalizeState(expect: Incomplete, proposedUpdate: Any?, mode: Int): Boolean =
-        if (expect is Finishing && expect.cancelled != null) {
-            tryFinalizeCancellingState(expect, proposedUpdate, mode)
-        } else {
-            val update = coerceProposedUpdate(expect, proposedUpdate)
-            tryFinalizeStateActually(expect, update, mode)
+    // Finalizes Finishing -> Completed (terminal state) transition.
+    // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
+    private fun tryFinalizeFinishingState(state: Finishing, proposedUpdate: Any?, mode: Int): Boolean {
+        require(proposedUpdate !is Incomplete) // only incomplete -> completed transition is allowed
+        require(this.state === state) // consistency check -- it cannot change
+        require(!state.isSealed) // consistency check -- cannot be sealed yet
+        require(state.isCompleting) // consistency check -- must be marked as completing
+        val proposedException = (proposedUpdate as? CompletedExceptionally)?.cause
+        // Create the final exception and seal the state so that no more exceptions can be added
+        var suppressed = false
+        val finalException = synchronized(state) {
+            val exceptions = state.sealLocked(proposedException)
+            val rootCause = getFinalRootCause(state, exceptions)
+            if (rootCause != null) suppressed = suppressExceptions(rootCause, exceptions)
+            rootCause
+        }
+        // Create the final state object
+        val finalState = when {
+            // if we have not failed -> use proposed update value
+            finalException == null -> proposedUpdate
+            // small optimization when we can used proposeUpdate object as is on failure
+            finalException === proposedException -> proposedUpdate
+            // cancelled job final state
+            else -> CompletedExceptionally(finalException)
         }
 
-    // Finalizes Cancelling -> Cancelled transition
-    private fun tryFinalizeCancellingState(expect: Finishing, proposedUpdate: Any?, mode: Int): Boolean {
-        /*
-         * If job is in 'cancelling' state and we're finalizing job state, we start intricate dance:
-         * 1) Synchronize on state to avoid races with concurrent
-         *    mutations (e.g. when new child is added)
-         * 2) After synchronization check we're still in the expected state
-         * 3) Aggregate final exception under the same lock which protects exceptions
-         *    collection
-         * 4) Pass it upstream
-         */
-        val finalException = synchronized(expect) {
-            if (_state.value !== expect) {
-                return false
-            }
-            if (proposedUpdate is CompletedExceptionally) {
-                expect.addExceptionLocked(proposedUpdate.cause)
-            }
-            /*
-             * Note that new exceptions cannot be added concurrently: state is guarded by lock
-             * and storage is sealed in the end, so all new exceptions will be reported separately
-             */
-            buildException(expect).also { expect.seal() }
+        // Now handle exception if parent can't handle it
+        if (finalException != null && !cancelParent(finalException)) {
+            handleJobException(finalException)
         }
-        val update = Cancelled(this, finalException ?: expect.cancelled!!.cause)
-        if (tryFinalizeStateActually(expect, update, mode)) return true
-        //  ^^^^ this CAS never fails: we're in the state when no jobs can be attached, because state is already sealed
-        val error = AssertionError("Unexpected state: ${_state.value}, expected: $expect, update: $update")
-        handleOnCompletionException(error)
-        throw error
+        // Then CAS to completed state -> it must succeed
+        require(_state.compareAndSet(state, finalState)) { "Unexpected state: ${_state.value}, expected: $state, update: $finalState" }
+        // And process all post-completion actions
+        completeStateFinalization(state, finalState, mode, suppressed)
+        return true
     }
 
-    private fun buildException(state: Finishing): Throwable? {
-        val cancelled = state.cancelled!!
-        val suppressed = state.exceptions
+    private fun getFinalRootCause(state: Finishing, exceptions: List<Throwable>): Throwable? {
+        // A case of no exceptions
+        if (exceptions.isEmpty()) {
+            // materialize cancellation exception if it was not materialized yet
+            if (state.isCancelling) return createJobCancellationException()
+            return null
+        }
         /*
          * This is a place where we step on our API limitation:
          * We can't distinguish internal JobCancellationException from our parent
@@ -203,39 +236,44 @@
          *
          * But it has negative consequences: same exception can be added as suppressed more than once.
          * Consider concurrent parent-child relationship:
-         * 1) Child throws E1 and parent throws E2
-         * 2) Parent goes to "Cancelling(E1)" and cancels child with E1
+         * 1) Child throws E1 and parent throws E2.
+         * 2) Parent goes to "Failing(E1)" and cancels child with E1
          * 3) Child goes to "Cancelling(E1)", but throws an exception E2
-         * 4) When child throws, it notifies parent that he is cancelling, adding its exception to parent list of exceptions
-         *    (again, parent don't know whether it's child exception or external exception)
-         * 5) Child builds final exception: E1 with suppressed E2, reports it to parent
+         * 4) When child throws, it notifies parent that it is failing, adding its exception to parent's list of exceptions/
+         * 5) Child builds final exception: E1 with suppressed E2, reports it to parent.
          * 6) Parent aggregates three exceptions: original E1, reported E2 and "final" E1.
          *    It filters the third exception, but adds the second one to the first one, thus adding suppressed duplicate.
          *
-         * Note that it's only happening when both parent and child throw exception simultaneously
+         * Note that it's only happening when both parent and child throw exception simultaneously.
          */
-        var rootCause = cancelled.cause
+        var rootCause = exceptions[0]
         if (rootCause is JobCancellationException) {
             val cause = unwrap(rootCause)
             rootCause = if (cause !== null) {
                 cause
             } else {
-                suppressed.firstOrNull { unwrap(it) != null } ?: return rootCause
-            }
-        }
-        // TODO it should be identity set and optimized for small footprints
-        val seenExceptions = HashSet<Throwable>(suppressed.size)
-        suppressed.forEach {
-            val unwrapped = unwrap(it)
-            if (unwrapped !== null && unwrapped !== rootCause) {
-                if (seenExceptions.add(unwrapped)) {
-                    rootCause.addSuppressedThrowable(unwrapped)
-                }
+                exceptions.firstOrNull { unwrap(it) != null } ?: return rootCause
             }
         }
         return rootCause
     }
 
+    private fun suppressExceptions(rootCause: Throwable, exceptions: List<Throwable>): Boolean {
+        if (exceptions.size <= 1) return false // nothing more to do here
+        val seenExceptions = identitySet<Throwable>(exceptions.size)
+        var suppressed = false
+        for (i in 1 until exceptions.size) {
+            val unwrapped = unwrap(exceptions[i])
+            if (unwrapped !== null && unwrapped !== rootCause) {
+                if (seenExceptions.add(unwrapped)) {
+                    rootCause.addSuppressedThrowable(unwrapped)
+                    suppressed = true
+                }
+            }
+        }
+        return suppressed
+    }
+
     private tailrec fun unwrap(exception: Throwable): Throwable? =
         if (exception is JobCancellationException) {
             val cause = exception.cause
@@ -244,29 +282,17 @@
             exception
         }
 
-    /**
-     * Tries to actually update [state] of this job to the final state and, if
-     * succeeds, disposes parent handle (detaching child from parent), and
-     * invokes all the handlers to notify on the final state transition.
-     */
-    private fun tryFinalizeStateActually(expect: Incomplete, update: Any?, mode: Int): Boolean {
-        require(update !is Incomplete) // only incomplete -> completed transition is allowed
-
-        /*
-         * We're publishing CompletedExceptionally as OpDescriptor to avoid races with parent:
-         * Job can't report exception before CAS (as it can fail), but after CAS there is a small window
-         * where the parent is considering this job (child) completed, though child has not yet reported its exception.
-         */
-        val updateValue = if (update is CompletedExceptionally) HandleExceptionOp(update) else update
-        if (!_state.compareAndSet(expect, updateValue)) return false // failed
-        if (updateValue is HandleExceptionOp) {
-            updateValue.perform(this) // help perform
-        }
-        completeStateFinalization(expect, update, mode)
+    // fast-path method to finalize normally completed coroutines without children
+    private fun tryFinalizeSimpleState(state: Incomplete, update: Any?, mode: Int): Boolean {
+        check(state is Empty || state is JobNode<*>) // only simple state without lists where children can concurrently add
+        check(update !is CompletedExceptionally) // only for normal completion
+        if (!_state.compareAndSet(state, update)) return false
+        completeStateFinalization(state, update, mode, false)
         return true
     }
 
-    private fun completeStateFinalization(expect: Incomplete, update: Any?, mode: Int) {
+    // suppressed == true when any exceptions were suppressed while building the final completion cause
+    private fun completeStateFinalization(state: Incomplete, update: Any?, mode: Int, suppressed: Boolean) {
         /*
          * Now the job in THE FINAL state. We need to properly handle the resulting state.
          * Order of various invocations here is important.
@@ -277,68 +303,46 @@
             it.dispose() // volatile read parentHandle _after_ state was updated
             parentHandle = NonDisposableHandle // release it just in case, to aid GC
         }
-        val exceptionally = update as? CompletedExceptionally
+        val cause = (update as? CompletedExceptionally)?.cause
         /*
-         * 2) Invoke onCancellationInternal: exception handling, resource cancellation etc.
-         *    Only notify on cancellation once (expect.isCancelling)
+         * 2) Invoke onCancellation: for resource cancellation resource cancellation etc.
+         *    Only notify is was not notified yet.
+         *    Note: we do not use notifyCancelling here, since we are going to invoke all completion as our next step
          */
-        if (!expect.isCancelling) {
-            onCancellationInternal(exceptionally)
-        }
+        if (!state.isCancelling) onCancellation(cause)
         /*
          * 3) Invoke completion handlers: .join(), callbacks etc.
          *    It's important to invoke them only AFTER exception handling, see #208
          */
-        val cause = exceptionally?.cause
-        if (expect is JobNode<*>) { // SINGLE/SINGLE+ state -- one completion handler (common case)
+        if (state is JobNode<*>) { // SINGLE/SINGLE+ state -- one completion handler (common case)
             try {
-                expect.invoke(cause)
+                state.invoke(cause)
             } catch (ex: Throwable) {
-                handleOnCompletionException(CompletionHandlerException("Exception in completion handler $expect for $this", ex))
+                handleOnCompletionException(CompletionHandlerException("Exception in completion handler $state for $this", ex))
             }
         } else {
-            expect.list?.notifyCompletion(cause)
+            state.list?.notifyCompletion(cause)
         }
         /*
-         * 4) Invoke onCompletionInternal: onNext(), timeout deregistration etc.
+         * 4) Invoke onCompletionInternal: onNext(), timeout de-registration etc.
          *    It should be last so all callbacks observe consistent state
          *    of the job which doesn't depend on callback scheduling.
          */
-        onCompletionInternal(update, mode)
+        onCompletionInternal(update, mode, suppressed)
     }
 
-    // when Job is in Cancelling state, it can only be promoted to Cancelled state,
-    // so if the proposed Update is not an appropriate Cancelled (preserving the cancellation cause),
-    // then the corresponding Cancelled state is constructed.
-    private fun coerceProposedUpdate(expect: Incomplete, proposedUpdate: Any?): Any? =
-        if (expect is Finishing && expect.cancelled != null && !isCorrespondinglyCancelled(expect.cancelled, proposedUpdate))
-            createCancelled(expect.cancelled, proposedUpdate) else proposedUpdate
-
-    private fun isCorrespondinglyCancelled(cancelled: Cancelled, proposedUpdate: Any?): Boolean {
-        if (proposedUpdate !is Cancelled) return false
-        // NOTE: equality comparison of causes is performed here by design, see equals of JobCancellationException
-        return proposedUpdate.cause == cancelled.cause || proposedUpdate.cause is JobCancellationException
-    }
-
-    private fun createCancelled(cancelled: Cancelled, proposedUpdate: Any?): Cancelled {
-        if (proposedUpdate !is CompletedExceptionally) return cancelled // not exception -- just use original cancelled
-        val exception = proposedUpdate.cause
-        if (cancelled.cause == exception) return cancelled // that is the cancelled we need already!
-        // That could have occurred while coroutine is being cancelled.
-        // Do not spam with JCE in suppressed exceptions
-        if (cancelled.cause !is JobCancellationException) {
-            exception.addSuppressedThrowable(cancelled.cause)
-        }
-        return Cancelled(this, exception)
+    private fun notifyCancelling(list: NodeList, cause: Throwable) {
+        // first cancel our own children
+        onCancellation(cause)
+        notifyHandlers<JobCancellingNode<*>>(list, cause)
+        // then report to the parent that we are failing
+        cancelParent(cause) // tentative failure report -- does not matter if there is no parent
     }
 
     private fun NodeList.notifyCompletion(cause: Throwable?) =
         notifyHandlers<JobNode<*>>(this, cause)
 
-    private fun notifyCancellation(list: NodeList, cause: Throwable?) =
-        notifyHandlers<JobCancellationNode<*>>(list, cause)
-
-      private inline fun <reified T: JobNode<*>> notifyHandlers(list: NodeList, cause: Throwable?) {
+    private inline fun <reified T: JobNode<*>> notifyHandlers(list: NodeList, cause: Throwable?) {
         var exception: Throwable? = null
         list.forEach<T> { node ->
             try {
@@ -390,13 +394,11 @@
 
     public final override fun getCancellationException(): CancellationException {
         val state = this.state
-        return when {
-            state is Finishing && state.cancelled != null ->
-                state.cancelled.cause.toCancellationException("Job is being cancelled")
-            state is Incomplete ->
-                error("Job was not completed or cancelled yet: $this")
-            state is CompletedExceptionally ->
-                state.cause.toCancellationException("Job has failed")
+        return when (state) {
+            is Finishing -> state.rootCause?.toCancellationException("Job is failing")
+                ?: error("Job is still new or active: $this")
+            is Incomplete -> error("Job is still new or active: $this")
+            is CompletedExceptionally -> state.cause.toCancellationException("Job has failed")
             else -> JobCancellationException("Job has completed normally", null, this)
         }
     }
@@ -408,14 +410,16 @@
      * Returns the cause that signals the completion of this job -- it returns the original
      * [cancel] cause, [CancellationException] or **`null` if this job had completed normally**.
      * This function throws [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor
-     * [isCancelled] yet.
+     * failing yet.
+     *
+     * @suppress **This is unstable API and it is subject to change.**
      */
-    protected fun getCompletionCause(): Throwable? {
-        val state = this.state
-        return when {
-            state is Finishing && state.cancelled != null -> state.cancelled.cause
-            state is Incomplete -> error("Job was not completed or cancelled yet")
-            state is CompletedExceptionally -> state.cause
+    protected fun getCompletionCause(): Throwable? = loopOnState { state ->
+        return when (state) {
+            is Finishing -> state.rootCause
+                ?: error("Job is still new or active: $this")
+            is Incomplete -> error("Job is still new or active: $this")
+            is CompletedExceptionally -> state.cause
             else -> null
         }
     }
@@ -454,13 +458,33 @@
                     if (list == null) { // SINGLE/SINGLE+
                         promoteSingleToNodeList(state as JobNode<*>)
                     } else {
-                        if (state is Finishing && state.cancelled != null && onCancelling) {
-                            // installing cancellation handler on job that is being cancelled
-                            if (invokeImmediately) handler.invokeIt(state.cancelled.cause)
-                            return NonDisposableHandle
+                        var rootCause: Throwable? = null
+                        var handle: DisposableHandle = NonDisposableHandle
+                        if (onCancelling && state is Finishing) {
+                            synchronized(state) {
+                                // check if we are installing failing handler on job that is failing
+                                rootCause = state.rootCause // != null if we are failing
+                                // We add node to the list in two cases --- either the job is not failing
+                                // or we are adding a child to a coroutine that is not completing yet
+                                if (rootCause == null || handler.isHandlerOf<ChildHandleImpl>() && !state.isCompleting) {
+                                    // Note: add node the list while holding lock on state (make sure it cannot change)
+                                    val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
+                                    if (!addLastAtomic(state, list, node)) return@loopOnState // retry
+                                    // just return node if we don't have to invoke handler (not failing yet)
+                                    if (rootCause == null) return node
+                                    // otherwise handler is invoked immediately out of the synchronized section & handle returned
+                                    handle = node
+                                }
+                            }
                         }
-                        val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
-                        if (addLastAtomic(state, list, node)) return node
+                        if (rootCause != null) {
+                            // Note: attachChild uses invokeImmediately, so it gets invoked when adding to failing job
+                            if (invokeImmediately) handler.invokeIt(rootCause)
+                            return handle
+                        } else {
+                            val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
+                            if (addLastAtomic(state, list, node)) return node
+                        }
                     }
                 }
                 else -> { // is complete
@@ -475,10 +499,10 @@
 
     private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode<*> {
         return if (onCancelling)
-            (handler as? JobCancellationNode<*>)?.also { require(it.job === this) }
-                ?: InvokeOnCancellation(this, handler)
+            (handler as? JobCancellingNode<*>)?.also { require(it.job === this) }
+                ?: InvokeOnCancelling(this, handler)
         else
-            (handler as? JobNode<*>)?.also { require(it.job === this && it !is JobCancellationNode) }
+            (handler as? JobNode<*>)?.also { require(it.job === this && it !is JobCancellingNode) }
                 ?: InvokeOnCompletion(this, handler)
     }
 
@@ -568,98 +592,156 @@
     }
 
     /**
+     * Returns `true` for job that do not have "body block" to complete and should immediately go into
+     * completing state and start waiting for children.
+     *
      * @suppress **This is unstable API and it is subject to change.**
      */
-    internal open val onCancelMode: Int get() = ON_CANCEL_MAKE_CANCELLING
+    internal open val onCancelComplete: Boolean get() = false
 
-    public override fun cancel(): Boolean = cancel(null)
+    // external cancel without cause, never invoked implicitly from internal machinery
+    public override fun cancel(): Boolean =
+        cancel(null) // must delegate here, because some classes override cancel(x)
 
-    public override fun cancel(cause: Throwable?): Boolean = when (onCancelMode) {
-        ON_CANCEL_MAKE_CANCELLING -> makeCancelling(cause)
-        ON_CANCEL_MAKE_COMPLETING -> makeCompleting(Cancelled(this, cause))
-        else -> error("Invalid onCancelMode $onCancelMode")
+    // external cancel with (optional) cause, never invoked implicitly from internal machinery
+    public override fun cancel(cause: Throwable?): Boolean =
+        cancelImpl(cause) && handlesException
+
+    // parent is reporting failure to a child child
+    public final override fun parentCancelled(parentJob: Job) {
+        cancelImpl(parentJob)
     }
 
-    // we will be dispatching coroutine to process its cancellation exception, so there is no need for
-    // an extra check for Job status in MODE_CANCELLABLE
-    private fun updateStateCancelled(state: Incomplete, cause: Throwable?) =
-        tryFinalizeState(state, Cancelled(this, cause), mode = MODE_ATOMIC_DEFAULT)
+    // child was cancelled with cause
+    internal fun childCancelled(cause: Throwable): Boolean =
+        cancelImpl(cause) && handlesException
 
-    // transitions to Cancelling state
-    private fun makeCancelling(cause: Throwable?): Boolean {
+    // cause is Throwable or Job when cancelChild was invoked
+    // returns true is exception was handled, false otherwise
+    private fun cancelImpl(cause: Any?): Boolean {
+        if (onCancelComplete) {
+            // make sure it is completing, if cancelMakeCompleting returns true it means it had make it
+            // completing and had recorded exception
+            if (cancelMakeCompleting(cause)) return true
+            // otherwise just record failure via makeCancelling below
+        }
+        return makeCancelling(cause)
+    }
+
+    private fun cancelMakeCompleting(cause: Any?): Boolean {
         loopOnState { state ->
-            when (state) {
-                is Empty -> { // EMPTY_X state -- no completion handlers
-                    if (state.isActive) {
-                        promoteEmptyToNodeList(state) // this way can wrap it into Cancelling on next pass
-                    } else {
-                        // cancelling a non-started coroutine makes it immediately cancelled
-                        // (and we have no listeners to notify which makes it very simple)
-                        if (updateStateCancelled(state, cause)) return true
-                    }
-                }
-                is JobNode<*> -> { // SINGLE/SINGLE+ state -- one completion handler
-                    promoteSingleToNodeList(state)
-                }
-                is NodeList -> { // LIST -- active list of completion handlers
-                    if (tryMakeCancelling(state, state.list, cause)) return true
-                }
-                is InactiveNodeList -> { // LIST -- inactive list of completion handlers
-                    // cancelling a non-started coroutine makes it immediately cancelled
-                    if (updateStateCancelled(state, cause))
-                        return true
-                }
-                is Finishing -> { // Completing/Cancelling the job, may cancel
-                    if (state.cancelled != null) {
-                        if (cause == null) {
-                            return true
-                        }
-                        /*
-                         * If we failed to add an exception, then `seal` was successfully called
-                         * and `seal` is called only after state update => retry is liveness-safe
-                         */
-                        if (state.addException(cause)) {
-                            return true
-                        } else {
-                            return@loopOnState
-                        }
-                    }
-                    if (tryMakeCancelling(state, state.list, cause)) return true
-                }
-                /*
-                 * Filter out duplicates due to race in the following pattern:
-                 * T1: parent -> completion sequence
-                 * T2: child -> set final state -> signal with final exception to the parent
-                 */
-                is CompletedExceptionally -> return state.cause === cause
-                else -> { // is inactive
-                    return false
-                }
+            if (state !is Incomplete || state is Finishing && state.isCompleting) {
+                return false // already completed/completing, do not even propose update
+            }
+            val proposedUpdate = CompletedExceptionally(createCauseException(cause))
+            when (tryMakeCompleting(state, proposedUpdate, mode = MODE_ATOMIC_DEFAULT)) {
+                COMPLETING_ALREADY_COMPLETING -> return false
+                COMPLETING_COMPLETED, COMPLETING_WAITING_CHILDREN -> return true
+                COMPLETING_RETRY -> return@loopOnState
+                else -> error("unexpected result")
             }
         }
     }
 
-    // try make expected state in cancelling on the condition that we're still in this state
-    private fun tryMakeCancelling(expect: Incomplete, list: NodeList, cause: Throwable?): Boolean {
-        val cancelled = Cancelled(this, cause)
-        if (!_state.compareAndSet(expect, Finishing(list, cancelled, false))) return false
-        onFinishingInternal(cancelled)
-        onCancellationInternal(cancelled)
-        // Materialize cause
-        notifyCancellation(list, cancelled.cause)
+    private fun createJobCancellationException() =
+        JobCancellationException("Job was cancelled", null, this)
+
+    // cause is Throwable or Job when cancelChild was invoked, cause can be null only on cancel
+    private fun createCauseException(cause: Any?): Throwable = when(cause) {
+        is Throwable? -> cause ?: createJobCancellationException()
+        else -> (cause as Job).getCancellationException()
+    }
+
+    // transitions to Failing state
+    // cause is Throwable or Job when cancelChild was invoked, cause can be null only on cancel
+    private fun makeCancelling(cause: Any?): Boolean {
+        var causeExceptionCache: Throwable? = null // lazily init result of createCauseException(cause)
+        loopOnState { state ->
+            when (state) {
+                is Finishing -> { // already finishing -- collect exceptions
+                    val notifyRootCause = synchronized(state) {
+                        if (state.isSealed) return false // too late, already sealed -- cannot add exception nor mark cancelled
+                        // add exception, do nothing is parent is cancelling child that is already being cancelled
+                        val wasCancelling = state.isCancelling // will notify if was not failing
+                        // Materialize missing exception if it is the first exception (otherwise -- don't)
+                        if (cause != null || !wasCancelling) {
+                            val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
+                            state.addExceptionLocked(causeException)
+                        }
+                        // take cause for notification is was not cancelling before
+                        state.rootCause.takeIf { !wasCancelling }
+                    }
+                    notifyRootCause?.let { notifyCancelling(state.list, it) }
+                    return true
+                }
+                is Incomplete -> {
+                    // Not yet finishing -- try to make it failing
+                    val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
+                    if (state.isActive) {
+                        // active state becomes failing
+                        if (tryMakeFailing(state, causeException)) return true
+                    } else {
+                        // non active state starts completing
+                        when (tryMakeCompleting(state, CompletedExceptionally(causeException), mode = MODE_ATOMIC_DEFAULT)) {
+                            COMPLETING_ALREADY_COMPLETING -> error("Cannot happen in $state")
+                            COMPLETING_COMPLETED, COMPLETING_WAITING_CHILDREN -> return true // ok
+                            COMPLETING_RETRY -> return@loopOnState
+                            else -> error("unexpected result")
+                        }
+                    }
+                }
+                else -> return false // already complete
+            }
+        }
+    }
+
+    // Performs promotion of incomplete coroutine state to NodeList for the purpose of
+    // converting coroutine state to Failing, returns null when need to retry
+    private fun getOrPromoteFailingList(state: Incomplete): NodeList? = state.list ?:
+        when (state) {
+            is Empty -> NodeList() // we can allocate new empty list that'll get integrated into Failing state
+            is JobNode<*> -> {
+                // SINGLE/SINGLE+ must be promoted to NodeList first, because otherwise we cannot
+                // correctly capture a reference to it
+                promoteSingleToNodeList(state)
+                null // retry
+            }
+            else -> error("State should have list: $state")
+        }
+
+    // try make new failing state on the condition that we're still in the expected state
+    private fun tryMakeFailing(state: Incomplete, rootCause: Throwable): Boolean {
+        check(state !is Finishing) // only for non-finishing states
+        check(state.isActive) // only for active states
+        // get state's list or else promote to list to correctly operate on child lists
+        val list = getOrPromoteFailingList(state) ?: return false
+        // Create failing state (with rootCause!)
+        val failing = Finishing(list, false, rootCause)
+        if (!_state.compareAndSet(state, failing)) return false
+        // Notify listeners
+        notifyCancelling(list, rootCause)
         return true
     }
 
     /**
+     * This function is used by [CompletableDeferred.complete] (and exceptionally) and by [JobImpl.cancel].
+     * It returns `false` on repeated invocation (when this job is already completing).
+     *
      * @suppress **This is unstable API and it is subject to change.**
      */
-    internal fun makeCompleting(proposedUpdate: Any?): Boolean =
-        when (makeCompletingInternal(proposedUpdate, mode = MODE_ATOMIC_DEFAULT)) {
-            COMPLETING_ALREADY_COMPLETING -> false
-            else -> true
+    internal fun makeCompleting(proposedUpdate: Any?): Boolean = loopOnState { state ->
+        when (tryMakeCompleting(state, proposedUpdate, mode = MODE_ATOMIC_DEFAULT)) {
+            COMPLETING_ALREADY_COMPLETING -> return false
+            COMPLETING_COMPLETED, COMPLETING_WAITING_CHILDREN -> return true
+            COMPLETING_RETRY -> return@loopOnState
+            else -> error("unexpected result")
         }
+    }
 
     /**
+     * This function is used by [AbstractCoroutine.resume].
+     * It throws exception on repeated invocation (when this job is already completing).
+     *
      * Returns:
      * * `true` if state was updated to completed/cancelled;
      * * `false` if made completing or it is cancelling and is waiting for children.
@@ -667,98 +749,106 @@
      * @throws IllegalStateException if job is already complete or completing
      * @suppress **This is unstable API and it is subject to change.**
      */
-    internal fun makeCompletingOnce(proposedUpdate: Any?, mode: Int): Boolean =
-        when (makeCompletingInternal(proposedUpdate, mode)) {
-            COMPLETING_COMPLETED -> true
-            COMPLETING_WAITING_CHILDREN -> false
-            else -> throw IllegalStateException("Job $this is already complete or completing, " +
+    internal fun makeCompletingOnce(proposedUpdate: Any?, mode: Int): Boolean = loopOnState { state ->
+        when (tryMakeCompleting(state, proposedUpdate, mode)) {
+            COMPLETING_ALREADY_COMPLETING -> throw IllegalStateException("Job $this is already complete or completing, " +
                 "but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull)
-        }
-
-    private fun makeCompletingInternal(proposedUpdate: Any?, mode: Int): Int {
-        loopOnState { state ->
-            if (state !is Incomplete)
-                return COMPLETING_ALREADY_COMPLETING
-            if (state is Finishing && state.completing)
-                return COMPLETING_ALREADY_COMPLETING
-            val child: ChildJob? = firstChild(state) ?: // or else complete immediately w/o children
-            when {
-                state !is Finishing && hasOnFinishingHandler(proposedUpdate) -> null // unless it has onFinishing handler
-                tryFinalizeState(state, proposedUpdate, mode) -> return COMPLETING_COMPLETED
-                else -> return@loopOnState
-            }
-            val list = state.list ?: // must promote to list to correctly operate on child lists
-            when (state) {
-                is Empty -> {
-                    promoteEmptyToNodeList(state)
-                    return@loopOnState // retry
-                }
-                is JobNode<*> -> {
-                    promoteSingleToNodeList(state)
-                    return@loopOnState // retry
-                }
-                else -> error("Unexpected state with an empty list: $state")
-            }
-            // cancel all children in list on exceptional completion
-            if (proposedUpdate is CompletedExceptionally)
-                child?.cancelChildrenInternal(proposedUpdate.cause)
-            // switch to completing state
-            val cancelled = (state as? Finishing)?.cancelled ?: (proposedUpdate as? Cancelled)
-            val completing = Finishing(list, cancelled, true)
-            if (_state.compareAndSet(state, completing)) {
-                (state as? Finishing)?.transferExceptions(completing)
-                if (state !is Finishing) onFinishingInternal(proposedUpdate)
-                if (child != null && tryWaitForChild(child, proposedUpdate))
-                    return COMPLETING_WAITING_CHILDREN
-                if (tryFinalizeState(completing, proposedUpdate, mode))
-                    return COMPLETING_COMPLETED
-            }
+            COMPLETING_COMPLETED -> return true
+            COMPLETING_WAITING_CHILDREN -> return false
+            COMPLETING_RETRY -> return@loopOnState
+            else -> error("unexpected result")
         }
     }
 
-    private tailrec fun ChildJob.cancelChildrenInternal(cause: Throwable) {
-        childJob.cancel(JobCancellationException("Child job was cancelled because of parent failure", cause, childJob))
-        nextChild()?.cancelChildrenInternal(cause)
+    private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?, mode: Int): Int {
+        if (state !is Incomplete)
+            return COMPLETING_ALREADY_COMPLETING
+        /*
+         * FAST PATH -- no children to wait for && simple state (no list) && not failing => can complete immediately
+         * Failures always have to go through Finishing state to serialize exception handling.
+         * Otherwise, there can be a race between (completed state -> handled exception and newly attached child/join)
+         * which may miss unhandled exception.
+         */
+        if ((state is Empty || state is JobNode<*>) && state !is ChildHandleImpl && proposedUpdate !is CompletedExceptionally) {
+            if (!tryFinalizeSimpleState(state, proposedUpdate, mode)) return COMPLETING_RETRY
+            return COMPLETING_COMPLETED
+        }
+        // get state's list or else promote to list to correctly operate on child lists
+        val list = getOrPromoteFailingList(state) ?: return COMPLETING_RETRY
+        // promote to Finishing state if we are not in it yet
+        // This promotion has to be atomic w.r.t to state change, so that a coroutine that is not active yet
+        // atomically transition to finishing & completing state
+        val finishing = state as? Finishing ?: Finishing(list, false, null)
+        // must synchronize updates to finishing state
+        var notifyRootCause: Throwable? = null
+        synchronized(finishing) {
+            // check if this state is already completing
+            if (finishing.isCompleting) return COMPLETING_ALREADY_COMPLETING
+            // mark as completing
+            finishing.isCompleting = true
+            // if we need to promote to finishing then atomically do it here.
+            // We do it as early is possible while still holding the lock. This ensures that we cancelImpl asap
+            // (if somebody else is faster) and we synchronize all the threads on this finishing lock asap.
+            if (finishing !== state) {
+                if (!_state.compareAndSet(state, finishing)) return COMPLETING_RETRY
+            }
+            // ## IMPORTANT INVARIANT: Only one thread (that had set isCompleting) can go past this point
+            require(!finishing.isSealed) // cannot be sealed
+            // add new proposed exception to the finishing state
+            val wasCancelling = finishing.isCancelling
+            (proposedUpdate as? CompletedExceptionally)?.let { finishing.addExceptionLocked(it.cause) }
+            // If it just becomes failing --> must process failing notifications
+            notifyRootCause = finishing.rootCause.takeIf { !wasCancelling }
+        }
+        // process failing notification here -- it cancels all the children _before_ we start to to wait them (sic!!!)
+        notifyRootCause?.let { notifyCancelling(list, it) }
+        // now wait for children
+        val child = firstChild(state)
+        if (child != null && tryWaitForChild(finishing, child, proposedUpdate))
+            return COMPLETING_WAITING_CHILDREN
+        // otherwise -- we have not children left (all were already cancelled?)
+        if (tryFinalizeFinishingState(finishing, proposedUpdate, mode))
+            return COMPLETING_COMPLETED
+        // otherwise retry
+        return COMPLETING_RETRY
     }
 
     private val Any?.exceptionOrNull: Throwable?
         get() = (this as? CompletedExceptionally)?.cause
 
     private fun firstChild(state: Incomplete) =
-        state as? ChildJob ?: state.list?.nextChild()
+        state as? ChildHandleImpl ?: state.list?.nextChild()
 
     // return false when there is no more incomplete children to wait
-    private tailrec fun tryWaitForChild(child: ChildJob, proposedUpdate: Any?): Boolean {
-        val handle = child.childJob.invokeOnCompletion(invokeImmediately = false,
-            handler = ChildCompletion(this, child, proposedUpdate).asHandler)
+    // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
+    private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleImpl, proposedUpdate: Any?): Boolean {
+        val handle = child.childJob.invokeOnCompletion(
+            invokeImmediately = false,
+            handler = ChildCompletion(this, state, child, proposedUpdate).asHandler
+        )
         if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
         val nextChild = child.nextChild() ?: return false
-        return tryWaitForChild(nextChild, proposedUpdate)
+        return tryWaitForChild(state, nextChild, proposedUpdate)
     }
 
-    /**
-     * @suppress **This is unstable API and it is subject to change.**
-     */
-    internal fun continueCompleting(lastChild: ChildJob, proposedUpdate: Any?) {
-        loopOnState { state ->
-            if (state !is Finishing)
-                throw IllegalStateException("Job $this is found in expected state while completing with $proposedUpdate", proposedUpdate.exceptionOrNull)
-            // figure out if we need to wait for next child
-            val waitChild = lastChild.nextChild()
-            // try wait for next child
-            if (waitChild != null && tryWaitForChild(waitChild, proposedUpdate)) return // waiting for next child
-            // no more children to wait -- try update state
-            if (tryFinalizeState(state, proposedUpdate, MODE_ATOMIC_DEFAULT)) return
-        }
+    // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
+    private fun continueCompleting(state: Finishing, lastChild: ChildHandleImpl, proposedUpdate: Any?) {
+        require(this.state === state) // consistency check -- it cannot change while we are waiting for children
+        // figure out if we need to wait for next child
+        val waitChild = lastChild.nextChild()
+        // try wait for next child
+        if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child
+        // no more children to wait -- try update state
+        if (tryFinalizeFinishingState(state, proposedUpdate, MODE_ATOMIC_DEFAULT)) return
     }
 
-    private fun LockFreeLinkedListNode.nextChild(): ChildJob? {
+    private fun LockFreeLinkedListNode.nextChild(): ChildHandleImpl? {
         var cur = this
         while (cur.isRemoved) cur = cur.prevNode // rollback to prev non-removed (or list head)
         while (true) {
             cur = cur.nextNode
             if (cur.isRemoved) continue
-            if (cur is ChildJob) return cur
+            if (cur is ChildHandleImpl) return cur
             if (cur is NodeList) return null // checked all -- no more children
         }
     }
@@ -766,16 +856,26 @@
     public final override val children: Sequence<Job> get() = buildSequence {
         val state = this@JobSupport.state
         when (state) {
-            is ChildJob -> yield(state.childJob)
+            is ChildHandleImpl -> yield(state.childJob)
             is Incomplete -> state.list?.let { list ->
-                list.forEach<ChildJob> { yield(it.childJob) }
+                list.forEach<ChildHandleImpl> { yield(it.childJob) }
             }
         }
     }
 
     @Suppress("OverridingDeprecatedMember")
-    public final override fun attachChild(child: Job): DisposableHandle =
-        invokeOnCompletion(onCancelling = true, handler = ChildJob(this, child).asHandler)
+    public final override fun attachChild(child: ChildJob): ChildHandle {
+        /*
+         * Note: This function attaches a special ChildNode object. This node object
+         * is handled in a special way on completion on the coroutine (we wait for all of them) and
+         * is handled specially by invokeOnCompletion itself -- it adds this node to the list even
+         * if the job is already failing.  For "failing" state child is attached under state lock.
+         * It's required to properly wait all children before completion and provide linearizable hierarchy view:
+         * If child is attached when job is failing, such child will receive immediate notification on failure,
+         * but parent *will* wait for that child before completion and will handle its exception.
+         */
+        return invokeOnCompletion(onCancelling = true, handler = ChildHandleImpl(this, child).asHandler) as ChildHandle
+    }
 
     @Suppress("OverridingDeprecatedMember")
     public final override fun cancelChildren(cause: Throwable?) {
@@ -792,40 +892,51 @@
     }
 
     /**
-     * This function is invoked once when job is cancelled or is completed.
-     * It's an optimization for [invokeOnCompletion] with `onCancelling` set to `true`.
+     * This function is invoked once when job is failing or is completed.
+     * It's an optimization for [invokeOnCompletion] with `onCancellation` set to `true`.
      *
-     * @param exceptionally not null when the the job was cancelled or completed exceptionally,
-     *               null when it has completed normally.
      * @suppress **This is unstable API and it is subject to change.*
      */
-    internal open fun onCancellationInternal(exceptionally: CompletedExceptionally?) {
-        // TODO rename to "onCancelling"
+    protected open fun onCancellation(cause: Throwable?) {}
+
+    /**
+     * When this function returns `true` the parent fails on the failure of this job.
+     *
+     * @suppress **This is unstable API and it is subject to change.*
+     */
+    protected open val cancelsParent: Boolean get() = false
+
+    /**
+     * Returns `true` for jobs that handle their exceptions via [handleJobException] or integrate them
+     * into the job's result via [onCompletionInternal]. The only instance of the [Job] that does not
+     * handle its exceptions is [JobImpl].
+     *
+     * @suppress **This is unstable API and it is subject to change.*
+     */
+    protected open val handlesException: Boolean get() = true
+
+    /**
+     * This method is invoked **exactly once** when the final exception of the job is determined
+     * and before it becomes complete. At the moment of invocation the job and all its children are complete.
+     *
+     * @suppress **This is unstable API and it is subject to change.*
+     */
+    protected open fun handleJobException(exception: Throwable) {}
+
+    private fun cancelParent(cause: Throwable): Boolean {
+        if (cause is CancellationException) return true
+        if (!cancelsParent) return false
+        return parentHandle?.childCancelled(cause) == true
     }
 
     /**
-     * Whether job has [onFinishingInternal] handler for given [update]
-     * @suppress **This is unstable API and it is subject to change.**
-     */
-    internal open fun hasOnFinishingHandler(update: Any?) = false
-
-    /**
-     * @suppress **This is unstable API and it is subject to change.**
-     */
-    internal open fun onFinishingInternal(update: Any?) {}
-
-    /**
-     * Method which is invoked once Job becomes [Cancelled] or [CompletedExceptionally].
-     * It's guaranteed that at the moment of invocation the job and all its children are complete.
-     */
-    internal open fun handleJobException(exception: Throwable) {}
-
-    /**
      * Override for post-completion actions that need to do something with the state.
+     * @param state the final state.
      * @param mode completion mode.
+     * @param suppressed true when any exceptions were suppressed while building the final completion cause.
      * @suppress **This is unstable API and it is subject to change.**
      */
-    internal open fun onCompletionInternal(state: Any?, mode: Int) {}
+    internal open fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {}
 
     // for nicer debugging
     public override fun toString(): String =
@@ -840,89 +951,99 @@
         val state = this.state
         return when (state) {
             is Finishing -> buildString {
-                if (state.cancelled != null) append("Cancelling")
-                if (state.completing) append("Completing")
+                when {
+                    state.isCancelling -> append("Cancelling")
+                    else -> append("Active")
+                }
+                if (state.isCompleting) append("Completing")
             }
             is Incomplete -> if (state.isActive) "Active" else "New"
-            is Cancelled -> "Cancelled"
-            is CompletedExceptionally -> "CompletedExceptionally"
+            is CompletedExceptionally -> "Cancelled"
             else -> "Completed"
         }
     }
 
-    // Cancelling or Completing
+    // Completing, Failing, Cancelling states,
+    // All updates are guarded by synchronized(this), reads are volatile
     @Suppress("UNCHECKED_CAST")
     private class Finishing(
         override val list: NodeList,
-        @JvmField val cancelled: Cancelled?, /* != null when cancelling */
-        @JvmField val completing: Boolean /* true when completing */
+        @Volatile
+        @JvmField var isCompleting: Boolean,
+        @Volatile
+        @JvmField var rootCause: Throwable? // NOTE: rootCause is kept even when SEALED
     ) : SynchronizedObject(), Incomplete {
-        override val isActive: Boolean get() = cancelled == null
+        @Volatile
+        private var _exceptionsHolder: Any? = null // Contains null | Throwable | ArrayList | SEALED
 
+        // NotE: cannot be modified when sealed
+        val isSealed: Boolean get() = _exceptionsHolder === SEALED
+        val isCancelling: Boolean get() = rootCause != null
+        override val isActive: Boolean get() = rootCause == null // !isCancelling
+
+        // Seals current state and returns list of exceptions
         // guarded by `synchronized(this)`
-        val exceptions: List<Throwable> get() = when(_exceptionsHolder) {
-            NOT_INITIALIZED -> emptyList()
-            is Throwable -> listOf(_exceptionsHolder as Throwable) // EA should handle this
-            else -> (_exceptionsHolder as List<Throwable>)
+        fun sealLocked(proposedException: Throwable?): List<Throwable> {
+            val eh = _exceptionsHolder // volatile read
+            val list = when(eh) {
+                null -> allocateList()
+                is Throwable -> allocateList().also { it.add(eh) }
+                is ArrayList<*> -> eh as ArrayList<Throwable>
+                else -> error("State is $eh") // already sealed -- cannot happen
+            }
+            val rootCause = this.rootCause // volatile read
+            rootCause?.let { list.add(0, it) } // note -- rootCause goes to the beginning
+            if (proposedException != null && proposedException != rootCause) list.add(proposedException)
+            _exceptionsHolder = SEALED
+            return list
         }
 
         // guarded by `synchronized(this)`
-        // Contains null | NOT_INITIALIZED | Throwable | ArrayList
-        private var _exceptionsHolder: Any? = if (cancelled == null) null else NOT_INITIALIZED
-
-        fun addException(exception: Throwable): Boolean =
-            synchronized(this) {
-                addExceptionLocked(exception)
+        fun addExceptionLocked(exception: Throwable) {
+            val rootCause = this.rootCause // volatile read
+            if (rootCause == null) {
+                this.rootCause = exception
+                return
             }
-
-        // guarded by `synchronized(this)`
-        fun addExceptionLocked(exception: Throwable): Boolean =
-            when (_exceptionsHolder) {
-                null -> false
-                NOT_INITIALIZED -> {
-                    _exceptionsHolder = exception
-                    true
-                }
+            if (exception === rootCause) return // nothing to do
+            val eh = _exceptionsHolder // volatile read
+            when (eh) {
+                null -> _exceptionsHolder = exception
                 is Throwable -> {
-                    val previous = _exceptionsHolder
-                    _exceptionsHolder = ArrayList<Any?>(4).apply {
-                        add(previous)
+                    if (exception === eh) return // nothing to do
+                    _exceptionsHolder = allocateList().apply {
+                        add(eh)
                         add(exception)
 
                     }
-                    true
                 }
-                else -> (_exceptionsHolder as ArrayList<Throwable>).add(exception)
-            }
-
-        /**
-         * Seals current state. After [seal] call all consecutive calls to [addException]
-         * return `false` forcing callers to handle pending exception by themselves.
-         * This call should be guarded by `synchronized(this)`
-         */
-        fun seal() {
-            _exceptionsHolder = null
-        }
-
-        fun transferExceptions(to: Finishing) {
-            synchronized(this) {
-                synchronized(to) {
-                    val holder = _exceptionsHolder
-                    when(holder) {
-                        // Note: "to" cannot be sealed at this point and adding exception there mush succeed.
-                        is Throwable -> require(to.addExceptionLocked(holder))
-                        is List<*> -> holder.forEach {
-                            require(to.addExceptionLocked(it as Throwable))
-                        }
-                    }
-                    seal()
-                }
+                is ArrayList<*> -> (eh as ArrayList<Throwable>).add(exception)
+                else -> error("State is $eh") // already sealed -- cannot happen
             }
         }
+
+        private fun allocateList() = ArrayList<Throwable>(4)
+
+        override fun toString(): String =
+            "Finishing[cancelling=$isCancelling, completing=$isCompleting, rootCause=$rootCause, exceptions=$_exceptionsHolder, list=$list]"
     }
 
     private val Incomplete.isCancelling: Boolean
-        get() = this is Finishing && cancelled != null
+        get() = this is Finishing && isCancelling
+
+    // Used by parent that is waiting for child completion
+    private class ChildCompletion(
+        private val parent: JobSupport,
+        private val state: Finishing,
+        private val child: ChildHandleImpl,
+        private val proposedUpdate: Any?
+    ) : JobNode<Job>(child.childJob) {
+        override fun invoke(cause: Throwable?) {
+            parent.continueCompleting(state, child, proposedUpdate)
+        }
+        override fun toString(): String =
+            "ChildCompletion[$child, $proposedUpdate]"
+    }
 
     /*
      * =================================================================================================
@@ -1019,34 +1140,20 @@
         else
             block.startCoroutineCancellable(state as T, select.completion)
     }
-
-    class HandleExceptionOp(val original: CompletedExceptionally) : OpDescriptor() {
-
-        override fun perform(affected: Any?): Any? {
-            val job = (affected as JobSupport)
-            if (job._state.compareAndSet(this, original)) {
-                job.handleJobException(original.cause)
-            }
-
-            return null
-        }
-    }
 }
 
 // --------------- helper classes & constants for job implementation
 
-internal const val ON_CANCEL_MAKE_CANCELLING = 0
-internal const val ON_CANCEL_MAKE_COMPLETING = 1
-
 private const val COMPLETING_ALREADY_COMPLETING = 0
 private const val COMPLETING_COMPLETED = 1
 private const val COMPLETING_WAITING_CHILDREN = 2
+private const val COMPLETING_RETRY = 3
 
 private const val RETRY = -1
 private const val FALSE = 0
 private const val TRUE = 1
 
-private val NOT_INITIALIZED = Symbol("NOT_INITIALIZED")
+private val SEALED = Symbol("SEALED")
 
 private val EMPTY_NEW = Empty(false)
 private val EMPTY_ACTIVE = Empty(true)
@@ -1058,13 +1165,8 @@
 
 internal class JobImpl(parent: Job? = null) : JobSupport(true) {
     init { initParentJobInternal(parent) }
-    override val onCancelMode: Int get() = ON_CANCEL_MAKE_COMPLETING
-
-    override fun cancel(cause: Throwable?): Boolean {
-        // JobImpl can't handle an exception, thus always returns false
-        super.cancel(cause)
-        return false
-    }
+    override val onCancelComplete get() = true
+    override val handlesException: Boolean get() = false
 }
 
 // -------- invokeOnCompletion nodes
@@ -1159,51 +1261,41 @@
 // -------- invokeOnCancellation nodes
 
 /**
- * Marker for node that shall be invoked on cancellation (in _cancelling_ state).
- * **Note: may be invoked multiple times during cancellation.**
+ * Marker for node that shall be invoked on in _failing_ state.
+ * **Note: may be invoked multiple times.**
  */
-internal abstract class JobCancellationNode<out J : Job>(job: J) : JobNode<J>(job)
+internal abstract class JobCancellingNode<out J : Job>(job: J) : JobNode<J>(job)
 
-private class InvokeOnCancellation(
+private class InvokeOnCancelling(
     job: Job,
     private val handler: CompletionHandler
-) : JobCancellationNode<Job>(job)  {
+) : JobCancellingNode<Job>(job)  {
     // delegate handler shall be invoked at most once, so here is an additional flag
-    private val _invoked = atomic(0)
+    private val _invoked = atomic(0) // todo: replace with atomic boolean after migration to recent atomicFu
     override fun invoke(cause: Throwable?) {
         if (_invoked.compareAndSet(0, 1)) handler.invoke(cause)
     }
-    override fun toString() = "InvokeOnCancellation[$classSimpleName@$hexAddress]"
+    override fun toString() = "InvokeOnCancelling[$classSimpleName@$hexAddress]"
 }
 
-internal class ChildJob(
+internal class ChildHandleImpl(
     parent: JobSupport,
-    @JvmField val childJob: Job
-) : JobCancellationNode<JobSupport>(parent) {
-    override fun invoke(cause: Throwable?) {
-        // Always materialize the actual instance of parent's completion exception and cancel child with it
-        childJob.cancel(job.getCancellationException())
-    }
-    override fun toString(): String = "ChildJob[$childJob]"
+    @JvmField val childJob: ChildJob
+) : JobCancellingNode<JobSupport>(parent), ChildHandle {
+    override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
+    override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
+    override fun toString(): String = "ChildHandle[$childJob]"
 }
 
-// Same as ChildJob, but for cancellable continuation
+// Same as ChildHandleImpl, but for cancellable continuation
 internal class ChildContinuation(
     parent: Job,
     @JvmField val child: AbstractContinuation<*>
-) : JobCancellationNode<Job>(parent) {
+) : JobCancellingNode<Job>(parent) {
     override fun invoke(cause: Throwable?) {
         child.cancel(job.getCancellationException())
     }
-    override fun toString(): String = "ChildContinuation[$child]"
+    override fun toString(): String =
+        "ChildContinuation[$child]"
 }
 
-private class ChildCompletion(
-    private val parent: JobSupport,
-    private val child: ChildJob,
-    private val proposedUpdate: Any?
-) : JobNode<Job>(child.childJob) {
-    override fun invoke(cause: Throwable?) {
-        parent.continueCompleting(child, proposedUpdate)
-    }
-}
diff --git a/common/kotlinx-coroutines-core-common/src/NonCancellable.kt b/common/kotlinx-coroutines-core-common/src/NonCancellable.kt
index d87b929..1d60aa4 100644
--- a/common/kotlinx-coroutines-core-common/src/NonCancellable.kt
+++ b/common/kotlinx-coroutines-core-common/src/NonCancellable.kt
@@ -116,7 +116,7 @@
      * @suppress **This an internal API and should not be used from general code.**
      */
     @InternalCoroutinesApi
-    override fun cancel(cause: Throwable?): Boolean = false
+    override fun cancel(cause: Throwable?): Boolean = false // never handles exceptions
 
     /**
      * Always returns [emptySequence].
@@ -127,12 +127,12 @@
         get() = emptySequence()
 
     /**
-     * Always returns no-op handle and does not do anything.
+     * Always returns [NonDisposableHandle] and does not do anything.
      * @suppress **This an internal API and should not be used from general code.**
      */
-    @Suppress("OverridingDeprecatedMember")
+    @Suppress("EXPOSED_FUNCTION_RETURN_TYPE", "EXPOSED_PARAMETER_TYPE")
     @InternalCoroutinesApi
-    override fun attachChild(child: Job): DisposableHandle = NonDisposableHandle
+    override fun attachChild(child: ChildJob): ChildHandle = NonDisposableHandle
 
     /**
      * Does not do anything.
diff --git a/common/kotlinx-coroutines-core-common/src/Scheduled.kt b/common/kotlinx-coroutines-core-common/src/Scheduled.kt
index 251696e..1545843 100644
--- a/common/kotlinx-coroutines-core-common/src/Scheduled.kt
+++ b/common/kotlinx-coroutines-core-common/src/Scheduled.kt
@@ -36,8 +36,25 @@
     replaceWith = ReplaceWith("withTimeout(unit.toMillis(time), block)")
 )
 public suspend fun <T> withTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend CoroutineScope.() -> T): T =
-    withTimeout(time.convertToMillis(unit), block)
+    withTimeout(unit.toMillis(time), block)
 
+/**
+ * Runs a given suspending block of code inside a coroutine with a specified timeout and returns
+ * `null` if this timeout was exceeded.
+ *
+ * The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
+ * cancellable suspending function inside the block throws [TimeoutCancellationException].
+ * **NB**: this method is exception-unsafe. Even if the code in the block suppresses [TimeoutCancellationException], this
+ * invocation of `withTimeoutOrNull` still returns `null` and thrown exception will be ignored.
+ *
+ * The sibling function that throws exception on timeout is [withTimeout].
+ * Note, that timeout action can be specified for [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
+ *
+ * This function delegates to [Delay.invokeOnTimeout] if the context [CoroutineDispatcher]
+ * implements [Delay] interface, otherwise it tracks time using a built-in single-threaded scheduled executor service.
+ *
+ * @param time timeout time in milliseconds.
+ */
 @Deprecated(level = DeprecationLevel.HIDDEN, message = "binary compatibility")
 public suspend fun <T> withTimeoutOrNull(time: Int, block: suspend CoroutineScope.() -> T): T? =
     withTimeoutOrNull(time.toLong(), TimeUnit.MILLISECONDS, block)
diff --git a/common/kotlinx-coroutines-core-common/src/Timeout.kt b/common/kotlinx-coroutines-core-common/src/Timeout.kt
index 0403d83..5c9ea07 100644
--- a/common/kotlinx-coroutines-core-common/src/Timeout.kt
+++ b/common/kotlinx-coroutines-core-common/src/Timeout.kt
@@ -96,7 +96,7 @@
     }
 
     @Suppress("UNCHECKED_CAST")
-    internal override fun onCompletionInternal(state: Any?, mode: Int) {
+    internal override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
         if (state is CompletedExceptionally)
             uCont.resumeUninterceptedWithExceptionMode(state.cause, mode)
         else
diff --git a/common/kotlinx-coroutines-core-common/src/channels/Broadcast.kt b/common/kotlinx-coroutines-core-common/src/channels/Broadcast.kt
index 96d71e0..3da7fd6 100644
--- a/common/kotlinx-coroutines-core-common/src/channels/Broadcast.kt
+++ b/common/kotlinx-coroutines-core-common/src/channels/Broadcast.kt
@@ -111,17 +111,16 @@
     override val channel: SendChannel<E>
         get() = this
 
-    override fun cancel(): Boolean = super.cancel()
-    public override fun cancel(cause: Throwable?): Boolean = super.cancel(cause)
+    override fun cancel(cause: Throwable?): Boolean {
+        val wasCancelled = _channel.cancel(cause)
+        if (wasCancelled) super.cancel(cause) // cancel the job
+        return wasCancelled
+    }
 
-    override fun onCancellationInternal(exceptionally: CompletedExceptionally?) {
-        val cause = exceptionally?.cause
-        val processed = when (exceptionally) {
-            is Cancelled -> _channel.cancel(cause) // producer coroutine was cancelled -- cancel channel
-            else -> _channel.close(cause) // producer coroutine has completed -- close channel
-        }
-        if (!processed && cause != null)
-            handleCoroutineException(context, cause, this)
+    override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
+        val cause = (state as? CompletedExceptionally)?.cause
+        val processed = _channel.close(cause)
+        if (cause != null && !processed && suppressed) handleExceptionViaHandler(context, cause)
     }
 }
 
diff --git a/common/kotlinx-coroutines-core-common/src/channels/ChannelCoroutine.kt b/common/kotlinx-coroutines-core-common/src/channels/ChannelCoroutine.kt
index 1e3c4d0..dfb5112 100644
--- a/common/kotlinx-coroutines-core-common/src/channels/ChannelCoroutine.kt
+++ b/common/kotlinx-coroutines-core-common/src/channels/ChannelCoroutine.kt
@@ -14,6 +14,11 @@
 ) : AbstractCoroutine<Unit>(parentContext, active), Channel<E> by _channel {
     val channel: Channel<E> get() = this
 
-    override fun cancel(): Boolean = super.cancel()
-    override fun cancel(cause: Throwable?): Boolean = super.cancel(cause)
+    override fun cancel() = cancel(null)
+
+    override fun cancel(cause: Throwable?): Boolean {
+        val wasCancelled = _channel.cancel(cause)
+        if (wasCancelled) super.cancel(cause) // cancel the job
+        return wasCancelled
+    }
 }
diff --git a/common/kotlinx-coroutines-core-common/src/channels/Produce.kt b/common/kotlinx-coroutines-core-common/src/channels/Produce.kt
index ba93b00..b7e75de 100644
--- a/common/kotlinx-coroutines-core-common/src/channels/Produce.kt
+++ b/common/kotlinx-coroutines-core-common/src/channels/Produce.kt
@@ -168,18 +168,12 @@
 private class ProducerCoroutine<E>(
     parentContext: CoroutineContext, channel: Channel<E>
 ) : ChannelCoroutine<E>(parentContext, channel, active = true), ProducerScope<E>, ProducerJob<E> {
-
     override val isActive: Boolean
         get() = super<ChannelCoroutine>.isActive
 
-    override fun onCancellationInternal(exceptionally: CompletedExceptionally?) {
-        val cause = exceptionally?.cause
-        val processed = when (exceptionally) {
-            // producer coroutine was cancelled -- cancel channel, but without cause if it was closed without cause
-            is Cancelled -> _channel.cancel(if (cause is CancellationException) null else cause)
-            else -> _channel.close(cause) // producer coroutine has completed -- close channel
-        }
-        if (!processed && cause != null)
-            handleCoroutineException(context, cause, this)
+    override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
+        val cause = (state as? CompletedExceptionally)?.cause
+        val processed = _channel.close(cause)
+        if (cause != null && !processed && suppressed) handleExceptionViaHandler(context, cause)
     }
 }
diff --git a/common/kotlinx-coroutines-core-common/src/internal/Concurrent.common.kt b/common/kotlinx-coroutines-core-common/src/internal/Concurrent.common.kt
index 682fbbe..be4b57c 100644
--- a/common/kotlinx-coroutines-core-common/src/internal/Concurrent.common.kt
+++ b/common/kotlinx-coroutines-core-common/src/internal/Concurrent.common.kt
@@ -20,3 +20,5 @@
 }
 
 internal expect inline fun <T> ReentrantLock.withLock(action: () -> T): T
+
+internal expect fun <E> identitySet(expectedSize: Int): MutableSet<E>
\ No newline at end of file
diff --git a/common/kotlinx-coroutines-core-common/src/internal/Scopes.kt b/common/kotlinx-coroutines-core-common/src/internal/Scopes.kt
index e2e7475..d087bc8 100644
--- a/common/kotlinx-coroutines-core-common/src/internal/Scopes.kt
+++ b/common/kotlinx-coroutines-core-common/src/internal/Scopes.kt
@@ -7,20 +7,13 @@
 import kotlinx.coroutines.experimental.*
 import kotlin.coroutines.experimental.*
 
-internal class ScopeOwnerCoroutine<R>(
+/**
+ * This is a coroutine instance that is created by [coroutineScope] builder.
+ * It is just a vanilla instance of [AbstractCoroutine].
+ */
+internal class ScopeCoroutine<R>(
     parentContext: CoroutineContext
-) : AbstractCoroutine<R>(parentContext, true), CoroutineScope {
-
-    override val coroutineContext: CoroutineContext = parentContext + this
-
-    /*
-     * Always return true, so final exception is in the scope before its completion.
-     */
-    override fun cancel(cause: Throwable?): Boolean {
-        super.cancel(cause)
-        return true
-    }
-}
+) : AbstractCoroutine<R>(parentContext, true)
 
 internal class ContextScope(context: CoroutineContext) : CoroutineScope {
     override val coroutineContext: CoroutineContext = context
diff --git a/common/kotlinx-coroutines-core-common/src/selects/Select.kt b/common/kotlinx-coroutines-core-common/src/selects/Select.kt
index ee9fe73..bbec635 100644
--- a/common/kotlinx-coroutines-core-common/src/selects/Select.kt
+++ b/common/kotlinx-coroutines-core-common/src/selects/Select.kt
@@ -289,19 +289,19 @@
     private fun initCancellability() {
         val parent = context[Job] ?: return
         val newRegistration = parent.invokeOnCompletion(
-            onCancelling = true, handler = SelectOnCancellation(parent).asHandler)
+            onCancelling = true, handler = SelectOnCancelling(parent).asHandler)
         parentHandle = newRegistration
         // now check our state _after_ registering
         if (isSelected) newRegistration.dispose()
     }
 
-    private inner class SelectOnCancellation(job: Job) : JobCancellationNode<Job>(job) {
+    private inner class SelectOnCancelling(job: Job) : JobCancellingNode<Job>(job) {
         // Note: may be invoked multiple times, but only the first trySelect succeeds anyway
         override fun invoke(cause: Throwable?) {
             if (trySelect(null))
                 resumeSelectCancellableWithException(job.getCancellationException())
         }
-        override fun toString(): String = "SelectOnCancellation[${this@SelectBuilderImpl}]"
+        override fun toString(): String = "SelectOnCancelling[${this@SelectBuilderImpl}]"
     }
 
     private val state: Any? get() {
diff --git a/common/kotlinx-coroutines-core-common/test/AbstractCoroutineTest.kt b/common/kotlinx-coroutines-core-common/test/AbstractCoroutineTest.kt
index 80a5705..06e45f5 100644
--- a/common/kotlinx-coroutines-core-common/test/AbstractCoroutineTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/AbstractCoroutineTest.kt
@@ -4,7 +4,6 @@
 
 package kotlinx.coroutines.experimental
 
-import kotlin.coroutines.experimental.*
 import kotlin.test.*
 
 class AbstractCoroutineTest : TestBase() {
diff --git a/common/kotlinx-coroutines-core-common/test/AsyncLazyTest.kt b/common/kotlinx-coroutines-core-common/test/AsyncLazyTest.kt
index 50ce50d..83be9e6 100644
--- a/common/kotlinx-coroutines-core-common/test/AsyncLazyTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/AsyncLazyTest.kt
@@ -20,7 +20,7 @@
         expect(2)
         assertTrue(!d.isActive && !d.isCompleted)
         assertEquals(d.await(), 42)
-        assertTrue(!d.isActive && d.isCompleted && !d.isCompletedExceptionally)
+        assertTrue(!d.isActive && d.isCompleted && !d.isCancelled)
         expect(4)
         assertEquals(d.await(), 42) // second await -- same result
         finish(5)
@@ -38,7 +38,7 @@
         expect(2)
         assertTrue(!d.isActive && !d.isCompleted)
         assertEquals(d.await(), 42)
-        assertTrue(!d.isActive && d.isCompleted && !d.isCompletedExceptionally)
+        assertTrue(!d.isActive && d.isCompleted && !d.isCancelled)
         expect(5)
         assertEquals(d.await(), 42) // second await -- same result
         finish(6)
@@ -67,7 +67,7 @@
         expect(5)
         assertTrue(!d.isActive && !d.isCompleted)
         assertEquals(d.await(), 42) // starts computing
-        assertTrue(!d.isActive && d.isCompleted && !d.isCompletedExceptionally)
+        assertTrue(!d.isActive && d.isCompleted && !d.isCancelled)
         finish(8)
     }
 
@@ -113,7 +113,7 @@
         try {
             d.await() // will throw IOException
         } catch (e: TestException) {
-            assertTrue(!d.isActive && d.isCompleted && d.isCompletedExceptionally && !d.isCancelled)
+            assertTrue(!d.isActive && d.isCompleted && d.isCancelled)
             expect(4)
         }
         finish(5)
@@ -133,7 +133,7 @@
         expect(3)
         assertTrue(!d.start())
         yield() // yield to started coroutine
-        assertTrue(!d.isActive && d.isCompleted && !d.isCompletedExceptionally) // and it finishes
+        assertTrue(!d.isActive && d.isCompleted && !d.isCancelled) // and it finishes
         expect(5)
         assertEquals(d.await(), 42) // await sees result
         finish(6)
@@ -151,7 +151,7 @@
         expect(2)
         assertTrue(!d.isActive && !d.isCompleted)
         assertTrue(d.cancel())
-        assertTrue(!d.isActive && d.isCompleted && d.isCompletedExceptionally && d.isCancelled)
+        assertTrue(!d.isActive && d.isCompleted && d.isCancelled)
         assertTrue(!d.cancel())
         assertTrue(!d.start())
         finish(3)
@@ -179,9 +179,9 @@
         expect(5)
         assertTrue(d.isActive && !d.isCompleted && !d.isCancelled)
         assertTrue(d.cancel())
-        assertTrue(!d.isActive && !d.isCompletedExceptionally && d.isCancelled) // cancelling !
+        assertTrue(!d.isActive && d.isCancelled) // cancelling !
         assertTrue(d.cancel())
-        assertTrue(!d.isActive && !d.isCompletedExceptionally && d.isCancelled) // still cancelling
+        assertTrue(!d.isActive && d.isCancelled) // still cancelling
         finish(6)
         assertEquals(d.await(), 42) // await shall throw CancellationException
         expectUnreached()
diff --git a/common/kotlinx-coroutines-core-common/test/AwaitTest.kt b/common/kotlinx-coroutines-core-common/test/AwaitTest.kt
index 71a3823..8a56fb1 100644
--- a/common/kotlinx-coroutines-core-common/test/AwaitTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/AwaitTest.kt
@@ -86,7 +86,7 @@
         }
 
         yield()
-        require(d.isCompleted && d2.isCompletedExceptionally && d3.isActive)
+        require(d.isCompleted && d2.isCancelled && d3.isActive)
         d3.cancel()
         finish(6)
     }
@@ -203,9 +203,9 @@
     @Test
     fun testAwaitAllFullyCompletedExceptionally() = runTest {
         val d1 = CompletableDeferred<Unit>(parent = null)
-            .apply { completeExceptionally(TestException()) }
+            .apply { cancel(TestException()) }
         val d2 = CompletableDeferred<Unit>(parent = null)
-            .apply { completeExceptionally(TestException()) }
+            .apply { cancel(TestException()) }
         val job = async { expect(3) }
         expect(1)
         try {
diff --git a/common/kotlinx-coroutines-core-common/test/CompletableDeferredTest.kt b/common/kotlinx-coroutines-core-common/test/CompletableDeferredTest.kt
index 4b78b30..f9a71b8 100644
--- a/common/kotlinx-coroutines-core-common/test/CompletableDeferredTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/CompletableDeferredTest.kt
@@ -19,7 +19,6 @@
         assertEquals(true, c.isActive)
         assertEquals(false, c.isCancelled)
         assertEquals(false, c.isCompleted)
-        assertEquals(false, c.isCompletedExceptionally)
         assertThrows<IllegalStateException> { c.getCancellationException() }
         assertThrows<IllegalStateException> { c.getCompleted() }
         assertThrows<IllegalStateException> { c.getCompletionExceptionOrNull() }
@@ -38,32 +37,12 @@
         assertEquals(false, c.isActive)
         assertEquals(false, c.isCancelled)
         assertEquals(true, c.isCompleted)
-        assertEquals(false, c.isCompletedExceptionally)
         assertTrue(c.getCancellationException() is JobCancellationException)
         assertEquals("OK", c.getCompleted())
         assertEquals(null, c.getCompletionExceptionOrNull())
     }
 
     @Test
-    fun testCompleteWithException() {
-        val c = CompletableDeferred<String>()
-        assertEquals(true, c.completeExceptionally(TestException()))
-        checkCompleteTestException(c)
-        assertEquals(false, c.completeExceptionally(TestException()))
-        checkCompleteTestException(c)
-    }
-
-    private fun checkCompleteTestException(c: CompletableDeferred<String>) {
-        assertEquals(false, c.isActive)
-        assertEquals(false, c.isCancelled)
-        assertEquals(true, c.isCompleted)
-        assertEquals(true, c.isCompletedExceptionally)
-        assertTrue(c.getCancellationException() is JobCancellationException)
-        assertThrows<TestException> { c.getCompleted() }
-        assertTrue(c.getCompletionExceptionOrNull() is TestException)
-    }
-
-    @Test
     fun testCancel() {
         val c = CompletableDeferred<String>()
         assertEquals(true, c.cancel())
@@ -76,7 +55,6 @@
         assertEquals(false, c.isActive)
         assertEquals(true, c.isCancelled)
         assertEquals(true, c.isCompleted)
-        assertEquals(true, c.isCompletedExceptionally)
         assertThrows<CancellationException> { c.getCompleted() }
         assertTrue(c.getCompletionExceptionOrNull() is CancellationException)
     }
@@ -94,7 +72,6 @@
         assertEquals(false, c.isActive)
         assertEquals(true, c.isCancelled)
         assertEquals(true, c.isCompleted)
-        assertEquals(true, c.isCompletedExceptionally)
         assertTrue(c.getCancellationException() is JobCancellationException)
         assertThrows<TestException> { c.getCompleted() }
         assertTrue(c.getCompletionExceptionOrNull() is TestException)
@@ -108,7 +85,11 @@
         parent.cancel()
         assertEquals(false, parent.isActive)
         assertEquals(true, parent.isCancelled)
-        checkCancel(c)
+        assertEquals(false, c.isActive)
+        assertEquals(true, c.isCancelled)
+        assertEquals(true, c.isCompleted)
+        assertThrows<CancellationException> { c.getCompleted() }
+        assertTrue(c.getCompletionExceptionOrNull() is CancellationException)
     }
 
     @Test
@@ -128,8 +109,8 @@
         val c = CompletableDeferred<String>(parent)
         checkFresh(c)
         assertEquals(true, parent.isActive)
-        assertEquals(true, c.completeExceptionally(TestException()))
-        checkCompleteTestException(c)
+        assertEquals(true, c.cancel(TestException()))
+        checkCancelWithException(c)
         assertEquals(true, parent.isActive)
     }
 
diff --git a/common/kotlinx-coroutines-core-common/test/CoroutineExceptionHandlerTest.kt b/common/kotlinx-coroutines-core-common/test/CoroutineExceptionHandlerTest.kt
index 366f1e4..80ddd46 100644
--- a/common/kotlinx-coroutines-core-common/test/CoroutineExceptionHandlerTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/CoroutineExceptionHandlerTest.kt
@@ -7,23 +7,42 @@
 import kotlin.test.*
 
 class CoroutineExceptionHandlerTest : TestBase() {
+    // Parent Job() does not handle exception --> handler is invoked on child crash
     @Test
-    fun testCoroutineExceptionHandlerCreator() = runTest {
+    fun testJob() = runTest {
         expect(1)
         var coroutineException: Throwable? = null
         val handler = CoroutineExceptionHandler { _, ex ->
             coroutineException = ex
             expect(3)
         }
-
-        val job = launch(handler + Job()) {
+        val parent = Job()
+        val job = launch(handler + parent) {
             throw TestException()
         }
-
         expect(2)
         job.join()
         finish(4)
         assertTrue(coroutineException is TestException)
+        assertTrue(parent.isCancelled)
+    }
+
+    // Parent CompletableDeferred() "handles" exception --> handler is NOT invoked on child crash
+    @Test
+    fun testCompletableDeferred() = runTest {
+        expect(1)
+        val handler = CoroutineExceptionHandler { _, ex ->
+            expectUnreached()
+        }
+        val parent = CompletableDeferred<Unit>()
+        val job = launch(handler + parent) {
+            throw TestException()
+        }
+        expect(2)
+        job.join()
+        finish(3)
+        assertTrue(parent.isCancelled)
+        assertTrue(parent.getCompletionExceptionOrNull() is TestException)
     }
 
     private class TestException: RuntimeException()
diff --git a/common/kotlinx-coroutines-core-common/test/CoroutinesTest.kt b/common/kotlinx-coroutines-core-common/test/CoroutinesTest.kt
index 916186d..db59aa5 100644
--- a/common/kotlinx-coroutines-core-common/test/CoroutinesTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/CoroutinesTest.kt
@@ -293,7 +293,7 @@
             expect(4)
             throw TestException("Crashed")
         }
-        launch(parent, CoroutineStart.UNDISPATCHED) {
+        val child = launch(parent, CoroutineStart.UNDISPATCHED) {
             expect(2)
             try {
                 yield() // to test
@@ -309,6 +309,8 @@
         expect(6)
         parent.join() // make sure crashed parent still waits for its child
         finish(8)
+        // make sure is cancelled
+        assertTrue(child.isCancelled)
     }
 
     @Test
diff --git a/common/kotlinx-coroutines-core-common/test/FailedJobTest.kt b/common/kotlinx-coroutines-core-common/test/FailedJobTest.kt
new file mode 100644
index 0000000..975e376
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/test/FailedJobTest.kt
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913
+
+package kotlinx.coroutines.experimental
+
+import kotlin.test.*
+
+// see https://github.com/Kotlin/kotlinx.coroutines/issues/585
+class FailedJobTest : TestBase() {
+    @Test
+    fun testCancelledJob() = runTest {
+        expect(1)
+        val job = launch {
+            expectUnreached()
+        }
+        expect(2)
+        job.cancelAndJoin()
+        finish(3)
+        assertTrue(job.isCompleted)
+        assertTrue(!job.isActive)
+        assertTrue(job.isCancelled)
+    }
+
+    @Test
+    fun testFailedJob() = runTest(
+        unhandled = listOf({it -> it is TestException })
+    ) {
+        expect(1)
+        val job = launch(NonCancellable) {
+            expect(3)
+            throw TestException()
+        }
+        expect(2)
+        job.join()
+        finish(4)
+        assertTrue(job.isCompleted)
+        assertTrue(!job.isActive)
+        assertTrue(job.isCancelled)
+    }
+
+    @Test
+    fun testFailedChildJob() = runTest(
+        unhandled = listOf({it -> it is TestException })
+    ) {
+        expect(1)
+        val job = launch(NonCancellable) {
+            expect(3)
+            launch {
+                throw TestException()
+            }
+        }
+        expect(2)
+        job.join()
+        finish(4)
+        assertTrue(job.isCompleted)
+        assertTrue(!job.isActive)
+        assertTrue(job.isCancelled)
+    }
+
+    private class TestException : Exception()
+}
\ No newline at end of file
diff --git a/common/kotlinx-coroutines-core-common/test/JobStatesTest.kt b/common/kotlinx-coroutines-core-common/test/JobStatesTest.kt
new file mode 100644
index 0000000..9306157
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/test/JobStatesTest.kt
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913
+
+package kotlinx.coroutines.experimental
+
+import kotlin.test.*
+
+/**
+ * Tests that the transitions to the state of the [Job] correspond to documentation in the
+ * table that is presented in the [Job] documentation.
+ */
+class JobStatesTest : TestBase() {
+    @Test
+    public fun testNormalCompletion() = runTest {
+        expect(1)
+        val job = launch(start = CoroutineStart.LAZY) {
+            expect(2)
+            // launches child
+            launch {
+                expect(4)
+            }
+            // completes normally
+        }
+        // New job
+        assertFalse(job.isActive)
+        assertFalse(job.isCompleted)
+        assertFalse(job.isCancelled)
+        // New -> Active
+        job.start()
+        assertTrue(job.isActive)
+        assertFalse(job.isCompleted)
+        assertFalse(job.isCancelled)
+        // Active -> Completing
+        yield() // scheduled & starts child
+        expect(3)
+        assertTrue(job.isActive)
+        assertFalse(job.isCompleted)
+        assertFalse(job.isCancelled)
+        // Completing -> Completed
+        yield()
+        finish(5)
+        assertFalse(job.isActive)
+        assertTrue(job.isCompleted)
+        assertFalse(job.isCancelled)
+    }
+
+    @Test
+    public fun testCompletingFailed() = runTest(
+        unhandled = listOf({ it -> it is TestException })
+    ) {
+        expect(1)
+        val job = launch(NonCancellable, start = CoroutineStart.LAZY) {
+            expect(2)
+            // launches child
+            launch {
+                expect(4)
+                throw TestException()
+            }
+            // completes normally
+        }
+        // New job
+        assertFalse(job.isActive)
+        assertFalse(job.isCompleted)
+        assertFalse(job.isCancelled)
+        // New -> Active
+        job.start()
+        assertTrue(job.isActive)
+        assertFalse(job.isCompleted)
+        assertFalse(job.isCancelled)
+        // Active -> Completing
+        yield() // scheduled & starts child
+        expect(3)
+        assertTrue(job.isActive)
+        assertFalse(job.isCompleted)
+        assertFalse(job.isCancelled)
+        // Completing -> Cancelled
+        yield()
+        finish(5)
+        assertFalse(job.isActive)
+        assertTrue(job.isCompleted)
+        assertTrue(job.isCancelled)
+    }
+
+    @Test
+    public fun testFailed() = runTest(
+        unhandled = listOf({ it -> it is TestException })
+    ) {
+        expect(1)
+        val job = launch(NonCancellable, start = CoroutineStart.LAZY) {
+            expect(2)
+            // launches child
+            launch(start = CoroutineStart.ATOMIC) {
+                expect(4)
+            }
+            // failing
+            throw TestException()
+        }
+        // New job
+        assertFalse(job.isActive)
+        assertFalse(job.isCompleted)
+        assertFalse(job.isCancelled)
+        // New -> Active
+        job.start()
+        assertTrue(job.isActive)
+        assertFalse(job.isCompleted)
+        assertFalse(job.isCancelled)
+        // Active -> Cancelling
+        yield() // scheduled & starts child
+        expect(3)
+        assertFalse(job.isActive)
+        assertFalse(job.isCompleted)
+        assertTrue(job.isCancelled)
+        // Cancelling -> Cancelled
+        yield()
+        finish(5)
+        assertFalse(job.isActive)
+        assertTrue(job.isCompleted)
+        assertTrue(job.isCancelled)
+    }
+
+    @Test
+    public fun testCancelling() = runTest {
+        expect(1)
+        val job = launch(NonCancellable, start = CoroutineStart.LAZY) {
+            expect(2)
+            // launches child
+            launch(start = CoroutineStart.ATOMIC) {
+                expect(4)
+            }
+            // completes normally
+        }
+        // New job
+        assertFalse(job.isActive)
+        assertFalse(job.isCompleted)
+        assertFalse(job.isCancelled)
+        // New -> Active
+        job.start()
+        assertTrue(job.isActive)
+        assertFalse(job.isCompleted)
+        assertFalse(job.isCancelled)
+        // Active -> Completing
+        yield() // scheduled & starts child
+        expect(3)
+        assertTrue(job.isActive)
+        assertFalse(job.isCompleted)
+        assertFalse(job.isCancelled)
+        // Completing -> Cancelling
+        job.cancel()
+        assertFalse(job.isActive)
+        assertFalse(job.isCompleted)
+        assertTrue(job.isCancelled)
+        // Cancelling -> Cancelled
+        yield()
+        finish(5)
+        assertFalse(job.isActive)
+        assertTrue(job.isCompleted)
+        assertTrue(job.isCancelled)
+    }
+
+    private class TestException : Exception()
+}
\ No newline at end of file
diff --git a/common/kotlinx-coroutines-core-common/test/channels/ProduceTest.kt b/common/kotlinx-coroutines-core-common/test/channels/ProduceTest.kt
index 5c455e4..9b7cc1d 100644
--- a/common/kotlinx-coroutines-core-common/test/channels/ProduceTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/channels/ProduceTest.kt
@@ -62,8 +62,7 @@
                 send(2) // will get cancelled
             } catch (e: Exception) {
                 finish(6)
-                check(e is JobCancellationException && e.job == coroutineContext[Job])
-                check(e.cause is TestException)
+                check(e is TestException)
                 throw e
             }
             expectUnreached()
diff --git a/core/kotlinx-coroutines-core/src/Builders.kt b/core/kotlinx-coroutines-core/src/Builders.kt
index 8e2b9df..8c93f4c 100644
--- a/core/kotlinx-coroutines-core/src/Builders.kt
+++ b/core/kotlinx-coroutines-core/src/Builders.kt
@@ -55,7 +55,7 @@
         if (privateEventLoop) require(eventLoop is BlockingEventLoop)
     }
 
-    override fun onCompletionInternal(state: Any?, mode: Int) {
+    override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
         // wake up blocked thread
         if (Thread.currentThread() != blockedThread)
             LockSupport.unpark(blockedThread)
diff --git a/core/kotlinx-coroutines-core/src/CoroutineExceptionHandlerImpl.kt b/core/kotlinx-coroutines-core/src/CoroutineExceptionHandlerImpl.kt
index 836592b..d570f29 100644
--- a/core/kotlinx-coroutines-core/src/CoroutineExceptionHandlerImpl.kt
+++ b/core/kotlinx-coroutines-core/src/CoroutineExceptionHandlerImpl.kt
@@ -21,8 +21,15 @@
 internal actual fun handleCoroutineExceptionImpl(context: CoroutineContext, exception: Throwable) {
     // use additional extension handlers
     for (handler in handlers) {
-        handler.handleException(context, exception)
+        try {
+            handler.handleException(context, exception)
+        } catch (t: Throwable) {
+            // Use thread's handler if custom handler failed to handle exception
+            val currentThread = Thread.currentThread()
+            currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, handlerException(exception, t))
+        }
     }
+
     // use thread's handler
     val currentThread = Thread.currentThread()
     currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception)
diff --git a/core/kotlinx-coroutines-core/src/channels/Actor.kt b/core/kotlinx-coroutines-core/src/channels/Actor.kt
index ed64588..abdc340 100644
--- a/core/kotlinx-coroutines-core/src/channels/Actor.kt
+++ b/core/kotlinx-coroutines-core/src/channels/Actor.kt
@@ -201,9 +201,8 @@
         _channel.cancel(cause)
     }
 
-    override fun handleJobException(exception: Throwable) {
-        handleCoroutineException(context, exception, this)
-    }
+    override val cancelsParent: Boolean get() = true
+    override fun handleJobException(exception: Throwable) = handleExceptionViaHandler(parentContext, exception)
 }
 
 private class LazyActorCoroutine<E>(
diff --git a/core/kotlinx-coroutines-core/src/internal/Concurrent.kt b/core/kotlinx-coroutines-core/src/internal/Concurrent.kt
index 457440f..8204b97 100644
--- a/core/kotlinx-coroutines-core/src/internal/Concurrent.kt
+++ b/core/kotlinx-coroutines-core/src/internal/Concurrent.kt
@@ -4,6 +4,7 @@
 
 package kotlinx.coroutines.experimental.internal
 
+import java.util.*
 import java.util.concurrent.*
 import kotlin.concurrent.withLock as withLockJvm
 
@@ -13,3 +14,5 @@
 internal actual typealias ReentrantLock = java.util.concurrent.locks.ReentrantLock
 
 internal actual inline fun <T> ReentrantLock.withLock(action: () -> T) = this.withLockJvm(action)
+
+internal actual fun <E> identitySet(expectedSize: Int): MutableSet<E> = Collections.newSetFromMap(IdentityHashMap(expectedSize))
diff --git a/core/kotlinx-coroutines-core/test/AsyncJvmTest.kt b/core/kotlinx-coroutines-core/test/AsyncJvmTest.kt
index b30c8bc..f5bcba7 100644
--- a/core/kotlinx-coroutines-core/test/AsyncJvmTest.kt
+++ b/core/kotlinx-coroutines-core/test/AsyncJvmTest.kt
@@ -27,11 +27,11 @@
         expect(2)
         yield() // to async
         expect(4)
-        check(d.isActive && !d.isCompleted && !d.isCompletedExceptionally)
+        check(d.isActive && !d.isCompleted && !d.isCancelled)
         check(d.cancel())
-        check(!d.isActive && !d.isCompleted && !d.isCompletedExceptionally)
-        check(d.cancel()) // second attempt returns false
-        check(!d.isActive && !d.isCompleted && !d.isCompletedExceptionally)
+        check(!d.isActive && !d.isCompleted && d.isCancelled)
+        check(d.cancel()) // second attempt still returns true (still cancelling)
+        check(!d.isActive && !d.isCompleted && d.isCancelled)
         expect(5)
         try {
             d.await() // awaits
@@ -40,7 +40,9 @@
             expect(7)
             check(e is CancellationException)
         }
-        check(!d.isActive && d.isCompleted && d.isCompletedExceptionally && d.isCancelled)
+        check(!d.isActive && d.isCompleted && d.isCancelled)
+        check(!d.cancel()) // now cancel return false -- already cancelled
+        check(!d.isActive && d.isCompleted && d.isCancelled)
         finish(8)
     }
 }
diff --git a/core/kotlinx-coroutines-core/test/AwaitJvmTest.kt b/core/kotlinx-coroutines-core/test/AwaitJvmTest.kt
index df36a20..b52c4ca 100644
--- a/core/kotlinx-coroutines-core/test/AwaitJvmTest.kt
+++ b/core/kotlinx-coroutines-core/test/AwaitJvmTest.kt
@@ -12,7 +12,7 @@
         // This test is to make sure that handlers installed on the second deferred do not leak
         val d1 = CompletableDeferred<Int>()
         val d2 = CompletableDeferred<Int>()
-        d1.completeExceptionally(TestException()) // first is crashed
+        d1.cancel(TestException()) // first is crashed
         val iterations = 3_000_000 * stressTestMultiplier
         for (iter in 1..iterations) {
             try {
diff --git a/core/kotlinx-coroutines-core/test/CoroutinesJvmTest.kt b/core/kotlinx-coroutines-core/test/CoroutinesJvmTest.kt
index d693cde..ba8bf1e 100644
--- a/core/kotlinx-coroutines-core/test/CoroutinesJvmTest.kt
+++ b/core/kotlinx-coroutines-core/test/CoroutinesJvmTest.kt
@@ -28,8 +28,8 @@
         repeat(n) {
             // create a child that already completed
             val child = launch(start = CoroutineStart.UNDISPATCHED) { /* do nothing */ }
-            // attach it manually
-            parent.attachChild(child)
+            // attach it manually via internal API
+            parent.attachChild(child as ChildJob)
         }
         parent.cancelAndJoin() // cancel parent, make sure no stack overflow
     }
diff --git a/core/kotlinx-coroutines-core/test/JobChildStressTest.kt b/core/kotlinx-coroutines-core/test/JobChildStressTest.kt
new file mode 100644
index 0000000..fca7ff6
--- /dev/null
+++ b/core/kotlinx-coroutines-core/test/JobChildStressTest.kt
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.experimental
+
+import org.junit.*
+import org.junit.Test
+import java.util.concurrent.*
+import kotlin.test.*
+
+class JobChildStressTest : TestBase() {
+    private val N_ITERATIONS = 10_000 * stressTestMultiplier
+    private val pool = newFixedThreadPoolContext(3, "JobChildStressTest")
+
+    @After
+    fun tearDown() {
+        pool.close()
+    }
+
+    /**
+     * Perform concurrent launch of a child job & cancellation of the explicit parent job
+     */
+    @Test
+    @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
+    fun testChild() = runTest {
+        val barrier = CyclicBarrier(3)
+        repeat(N_ITERATIONS) {
+            var wasLaunched = false
+            var unhandledException: Throwable? = null
+            val handler = CoroutineExceptionHandler { _, ex ->
+                unhandledException = ex
+            }
+            val scope = CoroutineScope(pool + handler)
+            val parent = CompletableDeferred<Unit>()
+            // concurrent child launcher
+            val launcher = scope.launch {
+                barrier.await()
+                // A: launch child for a parent job
+                launch(parent) {
+                    wasLaunched = true
+                    throw TestException()
+                }
+            }
+            // concurrent cancel
+            val canceller = scope.launch {
+                barrier.await()
+                // B: cancel parent job of a child
+                parent.cancel()
+            }
+            barrier.await()
+            joinAll(launcher, canceller, parent)
+            assertNull(unhandledException)
+            if (wasLaunched) {
+                val exception = parent.getCompletionExceptionOrNull()
+                assertTrue(exception is TestException, "exception=$exception")
+            }
+        }
+    }
+
+    private class TestException : Exception()
+}
\ No newline at end of file
diff --git a/core/kotlinx-coroutines-core/test/TestBase.kt b/core/kotlinx-coroutines-core/test/TestBase.kt
index 325e76f..7d576cf 100644
--- a/core/kotlinx-coroutines-core/test/TestBase.kt
+++ b/core/kotlinx-coroutines-core/test/TestBase.kt
@@ -63,7 +63,9 @@
         error.compareAndSet(null, cause)
         println("$message: $cause")
         cause.printStackTrace(System.out)
-    } 
+        println("--- Detected at ---")
+        Throwable().printStackTrace(System.out)
+    }
 
     /**
      * Throws [IllegalStateException] when `value` is false like `check` in stdlib, but also ensures that the
@@ -149,7 +151,6 @@
                     !unhandled[exCount - 1](e) ->
                         printError("Unhandled exception was unexpected: $e", e)
                 }
-                context[Job]?.cancel(e)
             })
         } catch (e: Throwable) {
             ex = e
diff --git a/core/kotlinx-coroutines-core/test/channels/ActorTest.kt b/core/kotlinx-coroutines-core/test/channels/ActorTest.kt
index a2eff95..478d95e 100644
--- a/core/kotlinx-coroutines-core/test/channels/ActorTest.kt
+++ b/core/kotlinx-coroutines-core/test/channels/ActorTest.kt
@@ -128,7 +128,7 @@
             job.await()
             expectUnreached()
         } catch (e: CancellationException) {
-            assertTrue(e.message?.contains("Job was cancelled normally") ?: false)
+            assertTrue(e.message?.contains("Job was cancelled") ?: false)
         }
 
         finish(3)
diff --git a/core/kotlinx-coroutines-core/test/exceptions/CoroutineExceptionHandlerJvmTest.kt b/core/kotlinx-coroutines-core/test/exceptions/CoroutineExceptionHandlerJvmTest.kt
new file mode 100644
index 0000000..b9fbfc9
--- /dev/null
+++ b/core/kotlinx-coroutines-core/test/exceptions/CoroutineExceptionHandlerJvmTest.kt
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.experimental.exceptions
+
+import kotlinx.coroutines.experimental.*
+import org.junit.*
+import org.junit.Test
+import kotlin.test.*
+
+class CoroutineExceptionHandlerJvmTest : TestBase() {
+
+    private val exceptionHandler = Thread.getDefaultUncaughtExceptionHandler()
+    private lateinit var caughtException: Throwable
+
+    @Before
+    fun setUp() {
+        Thread.setDefaultUncaughtExceptionHandler({ _, e -> caughtException = e })
+    }
+
+    @After
+    fun tearDown() {
+        Thread.setDefaultUncaughtExceptionHandler(exceptionHandler)
+    }
+
+    @Test
+    fun testFailingHandler() = runBlocking {
+        expect(1)
+        val job = GlobalScope.launch(CoroutineExceptionHandler { _, _ -> throw AssertionError() }) {
+            expect(2)
+            throw TestException()
+        }
+
+        job.join()
+        assertTrue(caughtException is RuntimeException)
+        assertTrue(caughtException.cause is AssertionError)
+        assertTrue(caughtException.suppressed()[0] is TestException)
+
+        finish(3)
+    }
+
+    private class TestException : Throwable()
+}
diff --git a/core/kotlinx-coroutines-core/test/exceptions/Exceptions.kt b/core/kotlinx-coroutines-core/test/exceptions/Exceptions.kt
index ba8428f..d97a433 100644
--- a/core/kotlinx-coroutines-core/test/exceptions/Exceptions.kt
+++ b/core/kotlinx-coroutines-core/test/exceptions/Exceptions.kt
@@ -34,17 +34,22 @@
 }
 
 class CapturingHandler : AbstractCoroutineContextElement(CoroutineExceptionHandler),
-    CoroutineExceptionHandler {
-    val unhandled: MutableList<Throwable> = Collections.synchronizedList(ArrayList<Throwable>())!!
+    CoroutineExceptionHandler
+{
+    private var unhandled: ArrayList<Throwable>? = ArrayList()
 
-    override fun handleException(context: CoroutineContext, exception: Throwable) {
-        unhandled.add(exception)
+    override fun handleException(context: CoroutineContext, exception: Throwable) = synchronized<Unit>(this) {
+        unhandled!!.add(exception)
     }
 
-    fun getException(): Throwable {
-        val size = unhandled.size
+    fun getExceptions(): List<Throwable> = synchronized(this) {
+        return unhandled!!.also { unhandled = null }
+    }
+
+    fun getException(): Throwable = synchronized(this) {
+        val size = unhandled!!.size
         assert(size == 1) { "Expected one unhandled exception, but have $size: $unhandled" }
-        return unhandled[0]
+        return unhandled!![0].also { unhandled = null }
     }
 }
 
@@ -63,5 +68,5 @@
 ): List<Throwable> {
     val handler = CapturingHandler()
     runBlocking(context + handler, block = block)
-    return handler.unhandled
+    return handler.getExceptions()
 }
diff --git a/core/kotlinx-coroutines-core/test/exceptions/JobBasicCancellationTest.kt b/core/kotlinx-coroutines-core/test/exceptions/JobBasicCancellationTest.kt
index 08bbb98..8421d5e 100644
--- a/core/kotlinx-coroutines-core/test/exceptions/JobBasicCancellationTest.kt
+++ b/core/kotlinx-coroutines-core/test/exceptions/JobBasicCancellationTest.kt
@@ -150,8 +150,8 @@
     @Test
     fun testConsecutiveCancellation() {
         val deferred = CompletableDeferred<Int>()
-        assertTrue(deferred.completeExceptionally(IndexOutOfBoundsException()))
-        assertFalse(deferred.completeExceptionally(AssertionError()))
+        assertTrue(deferred.cancel(IndexOutOfBoundsException()))
+        assertFalse(deferred.cancel(AssertionError())) // second cancelled is too late
         val cause = deferred.getCancellationException().cause!!
         assertTrue(cause is IndexOutOfBoundsException)
         assertNull(cause.cause)
diff --git a/core/kotlinx-coroutines-core/test/exceptions/JobExceptionHandlingTest.kt b/core/kotlinx-coroutines-core/test/exceptions/JobExceptionHandlingTest.kt
index 4d50096..f91399b 100644
--- a/core/kotlinx-coroutines-core/test/exceptions/JobExceptionHandlingTest.kt
+++ b/core/kotlinx-coroutines-core/test/exceptions/JobExceptionHandlingTest.kt
@@ -270,4 +270,36 @@
         assertTrue(suppressed[0] is IOException)
         assertTrue(suppressed[1] is IllegalArgumentException)
     }
+
+    @Test
+    fun testBadException() = runTest(unhandled = listOf({e -> e is BadException})) {
+        val job = launch(Job()) {
+            expect(2)
+            launch {
+                expect(3)
+                throw BadException()
+            }
+
+            launch(start = CoroutineStart.ATOMIC) {
+                expect(4)
+                throw BadException()
+            }
+
+            yield()
+            BadException()
+        }
+
+        expect(1)
+        yield()
+        yield()
+        expect(5)
+        job.join()
+        finish(6)
+    }
+
+    private class BadException : Exception() {
+        override fun hashCode(): Int {
+            throw AssertionError()
+        }
+    }
 }
diff --git a/core/kotlinx-coroutines-core/test/exceptions/JobExceptionsStressTest.kt b/core/kotlinx-coroutines-core/test/exceptions/JobExceptionsStressTest.kt
index 878e3ba..bd0b597 100644
--- a/core/kotlinx-coroutines-core/test/exceptions/JobExceptionsStressTest.kt
+++ b/core/kotlinx-coroutines-core/test/exceptions/JobExceptionsStressTest.kt
@@ -33,36 +33,39 @@
                 val job = launch(NonCancellable) {
                     launch(start = CoroutineStart.ATOMIC) {
                         barrier.await()
-                        throw ArithmeticException()
+                        throw TestException1()
                     }
                     launch(start = CoroutineStart.ATOMIC) {
                         barrier.await()
-                        throw IOException()
+                        throw TestException2()
                     }
                     launch(start = CoroutineStart.ATOMIC) {
                         barrier.await()
-                        throw IllegalArgumentException()
+                        throw TestException3()
                     }
-                    delay(Long.MAX_VALUE)
+                    delay(1000) // to avoid OutOfMemory errors....
                 }
                 barrier.await()
                 job.join()
             }
-
             val classes = mutableSetOf(
-                IllegalArgumentException::class,
-                IOException::class, ArithmeticException::class)
-
+                TestException1::class,
+                TestException2::class,
+                TestException3::class
+            )
             val suppressedExceptions = exception.suppressed().toSet()
             assertTrue(classes.remove(exception::class),
-                "Failed to remove ${exception::class} from $suppressedExceptions")
-
+                "Failed to remove ${exception::class} from $suppressedExceptions"
+            )
             for (throwable in suppressedExceptions.toSet()) { // defensive copy
                 assertTrue(classes.remove(throwable::class),
                     "Failed to remove ${throwable::class} from $suppressedExceptions")
             }
-
             assertTrue(classes.isEmpty(), "Expected all exception to be present, but following exceptions are missing: $classes")
         }
     }
+
+    private class TestException1 : Exception()
+    private class TestException2 : Exception()
+    private class TestException3 : Exception()
 }
diff --git a/core/kotlinx-coroutines-core/test/exceptions/JobNestedExceptionsTest.kt b/core/kotlinx-coroutines-core/test/exceptions/JobNestedExceptionsTest.kt
index 5affae1..5763bb4 100644
--- a/core/kotlinx-coroutines-core/test/exceptions/JobNestedExceptionsTest.kt
+++ b/core/kotlinx-coroutines-core/test/exceptions/JobNestedExceptionsTest.kt
@@ -68,22 +68,19 @@
     fun testNestedAtomicThrow() {
         val exception = runBlock {
             expect(1)
-            val job = launch(NonCancellable, start = CoroutineStart.ATOMIC) {
+            val job = launch(NonCancellable + CoroutineName("outer"), start = CoroutineStart.ATOMIC) {
                 expect(2)
-
-                launch(start = CoroutineStart.ATOMIC) {
-                    expect(3)
+                launch(CoroutineName("nested"), start = CoroutineStart.ATOMIC) {
+                    expect(4)
                     throw IOException()
                 }
-
+                expect(3)
                 throw ArithmeticException()
             }
-
             job.join()
-            finish(4)
+            finish(5)
         }
-
-        assertTrue(exception is ArithmeticException)
+        assertTrue(exception is ArithmeticException, "Found $exception")
         checkException<IOException>(exception.suppressed()[0])
     }
 
@@ -91,31 +88,33 @@
     fun testChildThrowsDuringCompletion() {
         val exceptions = runBlockForMultipleExceptions {
             expect(1)
-            val job = launch(NonCancellable, start = CoroutineStart.ATOMIC) {
+            val job = launch(NonCancellable + CoroutineName("outer"), start = CoroutineStart.ATOMIC) {
                 expect(2)
-                launch(start = CoroutineStart.ATOMIC) {
-                    expect(3)
-                    launch(start = CoroutineStart.ATOMIC) {
+                launch(CoroutineName("nested"), start = CoroutineStart.ATOMIC) {
+                    expect(4)
+                    launch(CoroutineName("nested2"), start = CoroutineStart.ATOMIC) {
                         // This child attaches to the parent and throws after parent completion
-                        expect(4)
+                        expect(6)
                         throw NullPointerException()
                     }
-
+                    expect(5)
                     throw IOException()
                 }
-
+                expect(3)
                 throw ArithmeticException()
             }
 
             job.join()
-            finish(5)
+            finish(7)
         }
 
-        assertEquals(1, exceptions.size)
+        assertEquals(1, exceptions.size, "Found $exceptions")
         val exception = exceptions[0]
+        assertTrue(exception is ArithmeticException, "Exception is $exception")
         val suppressed = exception.suppressed()
-        checkException<IOException>(suppressed[0])
-        checkException<NullPointerException>(suppressed[1])
+        // the order of suppressed exceptions here is a bit hacky, may change in the future
+        checkException<NullPointerException>(suppressed[0])
+        checkException<IOException>(suppressed[1])
         checkCycles(exception)
     }
 }
diff --git a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt
index b2809fb..b494563 100644
--- a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt
+++ b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt
@@ -160,7 +160,7 @@
         } catch (e: Throwable) {
             // unwrap original cause from ExecutionException
             val original = (e as? ExecutionException)?.cause ?: e
-            CompletableDeferred<T>().also { it.completeExceptionally(original) }
+            CompletableDeferred<T>().also { it.cancel(original) }
         }
     }
     val result = CompletableDeferred<T>()
@@ -168,7 +168,7 @@
         if (exception == null) {
             result.complete(value)
         } else {
-            result.completeExceptionally(exception)
+            result.cancel(exception)
         }
     }
     if (this is Future<*>) result.cancelFutureOnCompletion(this)
diff --git a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt
index 7f125f7..0b0006e 100644
--- a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt
+++ b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt
@@ -277,7 +277,7 @@
         }
         val deferred = future.asDeferred()
 
-        assertTrue(deferred.isCompletedExceptionally)
+        assertTrue(deferred.isCancelled)
         val completionException = deferred.getCompletionExceptionOrNull()!!
         assertTrue(completionException is TestException)
         assertEquals("something went wrong", completionException.message)
@@ -306,7 +306,7 @@
             deferred.await()
             fail("deferred.await() should throw an exception")
         } catch (e: Exception) {
-            assertTrue(deferred.isCompletedExceptionally)
+            assertTrue(deferred.isCancelled)
             assertTrue(e is CompletionException) // that's how supplyAsync wraps it
             val cause = e.cause!!
             assertTrue(cause is TestException)
diff --git a/js/kotlinx-coroutines-core-js/src/internal/Concurrent.kt b/js/kotlinx-coroutines-core-js/src/internal/Concurrent.kt
index c4f3d7f..a8b541a 100644
--- a/js/kotlinx-coroutines-core-js/src/internal/Concurrent.kt
+++ b/js/kotlinx-coroutines-core-js/src/internal/Concurrent.kt
@@ -14,3 +14,5 @@
 }
 
 internal actual fun <E> subscriberList(): SubscribersList<E> = CopyOnWriteList()
+
+internal actual fun <E> identitySet(expectedSize: Int): MutableSet<E> = HashSet(expectedSize)
diff --git a/js/kotlinx-coroutines-core-js/test/TestBase.kt b/js/kotlinx-coroutines-core-js/test/TestBase.kt
index 473f279..7873036 100644
--- a/js/kotlinx-coroutines-core-js/test/TestBase.kt
+++ b/js/kotlinx-coroutines-core-js/test/TestBase.kt
@@ -81,7 +81,6 @@
                 !unhandled[exCount - 1](e) ->
                     printError("Unhandled exception was unexpected: $e", e)
             }
-            context[Job]?.cancel(e)
         }).catch { e ->
             ex = e
             if (expected != null) {
diff --git a/native/kotlinx-coroutines-core-native/src/internal/Concurrent.kt b/native/kotlinx-coroutines-core-native/src/internal/Concurrent.kt
index 13a6d6a..ed872b3 100644
--- a/native/kotlinx-coroutines-core-native/src/internal/Concurrent.kt
+++ b/native/kotlinx-coroutines-core-native/src/internal/Concurrent.kt
@@ -14,3 +14,5 @@
 }
 
 internal actual fun <E> subscriberList(): MutableList<E> = CopyOnWriteList<E>()
+
+internal actual fun <E> identitySet(expectedSize: Int): MutableSet<E> = HashSet()
diff --git a/native/kotlinx-coroutines-core-native/test/TestBase.kt b/native/kotlinx-coroutines-core-native/test/TestBase.kt
index 7f75e44..81e680b 100644
--- a/native/kotlinx-coroutines-core-native/test/TestBase.kt
+++ b/native/kotlinx-coroutines-core-native/test/TestBase.kt
@@ -76,7 +76,6 @@
                     !unhandled[exCount - 1](e) ->
                         printError("Unhandled exception was unexpected: $e", e)
                 }
-                context[Job]?.cancel(e)
             })
         } catch (e: Throwable) {
             ex = e