Merge branch 'develop' into handle-exception-fix
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
index cd60af1..b1c6765 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
@@ -1,4 +1,5 @@
public abstract class kotlinx/coroutines/experimental/AbstractCoroutine : kotlin/coroutines/experimental/Continuation, kotlinx/coroutines/experimental/CoroutineScope, kotlinx/coroutines/experimental/Job {
+ protected final field parentContext Lkotlin/coroutines/experimental/CoroutineContext;
public fun <init> (Lkotlin/coroutines/experimental/CoroutineContext;Z)V
public synthetic fun <init> (Lkotlin/coroutines/experimental/CoroutineContext;ZILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun getContext ()Lkotlin/coroutines/experimental/CoroutineContext;
@@ -392,7 +393,7 @@
public abstract interface class kotlinx/coroutines/experimental/Job : kotlin/coroutines/experimental/CoroutineContext$Element {
public static final field Key Lkotlinx/coroutines/experimental/Job$Key;
- public abstract fun attachChild (Lkotlinx/coroutines/experimental/Job;)Lkotlinx/coroutines/experimental/DisposableHandle;
+ public abstract fun attachChild (Lkotlinx/coroutines/experimental/ChildJob;)Lkotlinx/coroutines/experimental/ChildHandle;
public abstract fun cancel ()Z
public abstract fun cancel (Ljava/lang/Throwable;)Z
public abstract synthetic fun cancelChildren (Ljava/lang/Throwable;)V
@@ -467,7 +468,7 @@
public final class kotlinx/coroutines/experimental/NonCancellable : kotlin/coroutines/experimental/AbstractCoroutineContextElement, kotlinx/coroutines/experimental/Job {
public static final field INSTANCE Lkotlinx/coroutines/experimental/NonCancellable;
- public fun attachChild (Lkotlinx/coroutines/experimental/Job;)Lkotlinx/coroutines/experimental/DisposableHandle;
+ public fun attachChild (Lkotlinx/coroutines/experimental/ChildJob;)Lkotlinx/coroutines/experimental/ChildHandle;
public fun cancel ()Z
public fun cancel (Ljava/lang/Throwable;)Z
public synthetic fun cancelChildren (Ljava/lang/Throwable;)V
@@ -487,8 +488,9 @@
public fun start ()Z
}
-public final class kotlinx/coroutines/experimental/NonDisposableHandle : kotlinx/coroutines/experimental/DisposableHandle {
+public final class kotlinx/coroutines/experimental/NonDisposableHandle : kotlinx/coroutines/experimental/ChildHandle, kotlinx/coroutines/experimental/DisposableHandle {
public static final field INSTANCE Lkotlinx/coroutines/experimental/NonDisposableHandle;
+ public fun childCancelled (Ljava/lang/Throwable;)Z
public fun dispose ()V
public fun toString ()Ljava/lang/String;
}
diff --git a/common/kotlinx-coroutines-core-common/src/AbstractCoroutine.kt b/common/kotlinx-coroutines-core-common/src/AbstractCoroutine.kt
index f674a5e..020af59 100644
--- a/common/kotlinx-coroutines-core-common/src/AbstractCoroutine.kt
+++ b/common/kotlinx-coroutines-core-common/src/AbstractCoroutine.kt
@@ -5,6 +5,7 @@
package kotlinx.coroutines.experimental
import kotlinx.coroutines.experimental.CoroutineStart.*
+import kotlinx.coroutines.experimental.internal.*
import kotlinx.coroutines.experimental.intrinsics.*
import kotlin.coroutines.experimental.*
@@ -14,12 +15,12 @@
* This class implements completion [Continuation], [Job], and [CoroutineScope] interfaces.
* It stores the result of continuation in the state of the job.
* This coroutine waits for children coroutines to finish before completing and
- * is cancelled through an intermediate _cancelling_ state.
+ * fails through an intermediate _failing_ state.
*
* The following methods are available for override:
*
* * [onStart] is invoked when coroutine is create in not active state and is [started][Job.start].
- * * [onCancellation] is invoked as soon as coroutine is [cancelled][cancel] (becomes _cancelling_)
+ * * [onCancellation] is invoked as soon as coroutine is _failing_, or is cancelled,
* or when it completes for any reason.
* * [onCompleted] is invoked when coroutine completes with a value.
* * [onCompletedExceptionally] in invoked when coroutines completes with exception.
@@ -33,12 +34,22 @@
@Suppress("EXPOSED_SUPER_CLASS")
@InternalCoroutinesApi
public abstract class AbstractCoroutine<in T>(
- private val parentContext: CoroutineContext,
+ /**
+ * Context of the parent coroutine.
+ */
+ @JvmField
+ protected val parentContext: CoroutineContext,
active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
+ /**
+ * Context of this coroutine that includes this coroutine as a [Job].
+ */
@Suppress("LeakingThis")
public final override val context: CoroutineContext = parentContext + this
- @Deprecated("Replaced with context", replaceWith = ReplaceWith("context"))
+
+ /**
+ * Context of this scope which is the same as the [context] of this coroutine.
+ */
public override val coroutineContext: CoroutineContext get() = context
override val isActive: Boolean get() = super<JobSupport>.isActive
@@ -66,20 +77,16 @@
}
/**
- * This function is invoked once when this coroutine is cancelled or is completed,
+ * This function is invoked once when this coroutine is cancelled
* similarly to [invokeOnCompletion] with `onCancelling` set to `true`.
*
* The meaning of [cause] parameter:
* * Cause is `null` when job has completed normally.
* * Cause is an instance of [CancellationException] when job was cancelled _normally_.
* **It should not be treated as an error**. In particular, it should not be reported to error logs.
- * * Otherwise, the job had _failed_.
+ * * Otherwise, the job had been cancelled or failed with exception.
*/
- protected open fun onCancellation(cause: Throwable?) {}
-
- internal override fun onCancellationInternal(exceptionally: CompletedExceptionally?) {
- onCancellation(exceptionally?.cause)
- }
+ protected override fun onCancellation(cause: Throwable?) {}
/**
* This function is invoked once when job is completed normally with the specified [value].
@@ -89,10 +96,11 @@
/**
* This function is invoked once when job is completed exceptionally with the specified [exception].
*/
+ // todo: rename to onCancelled
protected open fun onCompletedExceptionally(exception: Throwable) {}
@Suppress("UNCHECKED_CAST")
- internal override fun onCompletionInternal(state: Any?, mode: Int) {
+ internal override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
if (state is CompletedExceptionally)
onCompletedExceptionally(state.cause)
else
diff --git a/common/kotlinx-coroutines-core-common/src/Builders.common.kt b/common/kotlinx-coroutines-core-common/src/Builders.common.kt
index 48d2d27..3a0e981 100644
--- a/common/kotlinx-coroutines-core-common/src/Builders.common.kt
+++ b/common/kotlinx-coroutines-core-common/src/Builders.common.kt
@@ -281,20 +281,11 @@
// --------------- implementation ---------------
private open class StandaloneCoroutine(
- private val parentContext: CoroutineContext,
+ parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
- override fun hasOnFinishingHandler(update: Any?) = update is CompletedExceptionally
-
- override fun handleJobException(exception: Throwable) {
- handleCoroutineException(parentContext, exception, this)
- }
-
- override fun onFinishingInternal(update: Any?) {
- if (update is CompletedExceptionally && update.cause !is CancellationException) {
- parentContext[Job]?.cancel(update.cause)
- }
- }
+ override val cancelsParent: Boolean get() = true
+ override fun handleJobException(exception: Throwable) = handleExceptionViaHandler(parentContext, exception)
}
private class LazyStandaloneCoroutine(
diff --git a/common/kotlinx-coroutines-core-common/src/CompletableDeferred.kt b/common/kotlinx-coroutines-core-common/src/CompletableDeferred.kt
index d85b784..ba87286 100644
--- a/common/kotlinx-coroutines-core-common/src/CompletableDeferred.kt
+++ b/common/kotlinx-coroutines-core-common/src/CompletableDeferred.kt
@@ -7,10 +7,11 @@
import kotlinx.coroutines.experimental.selects.*
/**
- * A [Deferred] that can be completed via public functions
- * [complete], [completeExceptionally], and [cancel].
+ * A [Deferred] that can be completed via public functions [complete] or [cancel][Job.cancel].
*
- * Completion functions return `false` when this deferred value is already complete or completing.
+ * Note, that [complete] functions returns `false` when this deferred value is already complete or completing,
+ * while [cancel][Job.cancel] returns `true` as long the deferred is still _cancelling_ and the corresponding
+ * exception is incorporated into the final [completion exception][getCompletionExceptionOrNull].
*
* An instance of completable deferred can be created by `CompletableDeferred()` function in _active_ state.
*
@@ -32,6 +33,7 @@
*
* Repeated invocations of this function have no effect and always produce `false`.
*/
+ @Deprecated(message = "Use cancel", replaceWith = ReplaceWith("cancel(exception)"))
public fun completeExceptionally(exception: Throwable): Boolean
}
@@ -61,7 +63,7 @@
parent: Job?
) : JobSupport(true), CompletableDeferred<T>, SelectClause1<T> {
init { initParentJobInternal(parent) }
- override val onCancelMode: Int get() = ON_CANCEL_MAKE_COMPLETING
+ override val onCancelComplete get() = true
override fun getCompleted(): T = getCompletedInternal() as T
override suspend fun await(): T = awaitInternal() as T
override val onAwait: SelectClause1<T> get() = this
diff --git a/common/kotlinx-coroutines-core-common/src/CompletedExceptionally.kt b/common/kotlinx-coroutines-core-common/src/CompletedExceptionally.kt
index 6c564f2..da3825c 100644
--- a/common/kotlinx-coroutines-core-common/src/CompletedExceptionally.kt
+++ b/common/kotlinx-coroutines-core-common/src/CompletedExceptionally.kt
@@ -8,15 +8,16 @@
import kotlin.coroutines.experimental.*
/**
- * Class for an internal state of a job that had completed exceptionally, including cancellation.
+ * Class for an internal state of a job that was cancelled (completed exceptionally).
*
* **Note: This class cannot be used outside of internal coroutines framework**.
- * **Note: cannot be internal until we get rid of MutableDelegateContinuation in IO**
+ * **Note: cannot be internal and renamed until we get rid of MutableDelegateContinuation in IO**
*
* @param cause the exceptional completion cause. It's either original exceptional cause
* or artificial [CancellationException] if no cause was provided
* @suppress **This is unstable API and it is subject to change.**
*/
+// todo: rename to Cancelled
open class CompletedExceptionally(
@JvmField public val cause: Throwable
) {
@@ -24,20 +25,6 @@
}
/**
- * A specific subclass of [CompletedExceptionally] for cancelled jobs.
- *
- * **Note: This class cannot be used outside of internal coroutines framework**.
- *
- * @param job the job that was cancelled.
- * @param cause the exceptional completion cause. If `cause` is null, then a [CancellationException] is created.
- * @suppress **This is unstable API and it is subject to change.**
- */
-internal class Cancelled(
- job: Job,
- cause: Throwable?
-) : CompletedExceptionally(cause ?: JobCancellationException("Job was cancelled normally", null, job))
-
-/**
* A specific subclass of [CompletedExceptionally] for cancelled [AbstractContinuation].
*
* **Note: This class cannot be used outside of internal coroutines framework**.
diff --git a/common/kotlinx-coroutines-core-common/src/CompletionHandler.common.kt b/common/kotlinx-coroutines-core-common/src/CompletionHandler.common.kt
index c92634f..89f6a97 100644
--- a/common/kotlinx-coroutines-core-common/src/CompletionHandler.common.kt
+++ b/common/kotlinx-coroutines-core-common/src/CompletionHandler.common.kt
@@ -43,3 +43,5 @@
// :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
// because we play type tricks on Kotlin/JS and handler is not necessarily a function there
internal expect fun CompletionHandler.invokeIt(cause: Throwable?)
+
+internal inline fun <reified T> CompletionHandler.isHandlerOf(): Boolean = this is T
diff --git a/common/kotlinx-coroutines-core-common/src/CoroutineExceptionHandler.kt b/common/kotlinx-coroutines-core-common/src/CoroutineExceptionHandler.kt
index 088cc69..2450039 100644
--- a/common/kotlinx-coroutines-core-common/src/CoroutineExceptionHandler.kt
+++ b/common/kotlinx-coroutines-core-common/src/CoroutineExceptionHandler.kt
@@ -18,38 +18,42 @@
*
* If there is a [Job] in the context and it's not a [caller], then [Job.cancel] is invoked.
* If invocation returned `true`, method terminates: now [Job] is responsible for handling an exception.
- * Otherwise, If there is [CoroutineExceptionHandler] in the context, it is used.
- * Otherwise all instances of [CoroutineExceptionHandler] found via [ServiceLoader] and [Thread.uncaughtExceptionHandler] are invoked
+ * Otherwise, If there is [CoroutineExceptionHandler] in the context, it is used. If it throws an exception during handling
+ * or is absent, all instances of [CoroutineExceptionHandler] found via [ServiceLoader] and [Thread.uncaughtExceptionHandler] are invoked
+ * todo: Deprecate/hide this function.
*/
@JvmOverloads // binary compatibility
@InternalCoroutinesApi
public fun handleCoroutineException(context: CoroutineContext, exception: Throwable, caller: Job? = null) {
- // if exception handling fails, make sure the original exception is not lost
+ // Ignore CancellationException (they are normal ways to terminate a coroutine)
+ if (exception is CancellationException) return // nothing to do
+ // Try propagate exception to parent
+ val job = context[Job]
+ if (job !== null && job !== caller && job.cancel(exception)) return // handle by parent
+ // otherwise -- use exception handlers
+ handleExceptionViaHandler(context, exception)
+}
+
+internal fun handleExceptionViaHandler(context: CoroutineContext, exception: Throwable) {
+ // Invoke exception handler from the context if present
try {
- // Ignore CancellationException (they are normal ways to terminate a coroutine)
- if (exception is CancellationException) {
- return
- }
- // If parent is successfully cancelled, we're done, it is now its responsibility to handle the exception
- val parent = context[Job]
- // E.g. actor registers itself in the context, in that case we should invoke handler
- if (parent !== null && parent !== caller && parent.cancel(exception)) {
- return
- }
- // If not, invoke exception handler from the context
context[CoroutineExceptionHandler]?.let {
it.handleException(context, exception)
return
}
- // If handler is not present in the context, fallback to the global handler
- handleCoroutineExceptionImpl(context, exception)
- } catch (handlerException: Throwable) {
- // simply rethrow if handler threw the original exception
- if (handlerException === exception) throw exception
- // handler itself crashed for some other reason -- that is bad -- keep both
- throw RuntimeException("Exception while trying to handle coroutine exception", exception).apply {
- addSuppressedThrowable(handlerException)
- }
+ } catch (t: Throwable) {
+ handleCoroutineExceptionImpl(context, handlerException(exception, t))
+ return
+ }
+
+ // If handler is not present in the context or exception was thrown, fallback to the global handler
+ handleCoroutineExceptionImpl(context, exception)
+}
+
+internal fun handlerException(originalException: Throwable, thrownException: Throwable): Throwable {
+ if (originalException === thrownException) return originalException
+ return RuntimeException("Exception while trying to handle coroutine exception", thrownException).apply {
+ addSuppressedThrowable(originalException)
}
}
diff --git a/common/kotlinx-coroutines-core-common/src/CoroutineScope.kt b/common/kotlinx-coroutines-core-common/src/CoroutineScope.kt
index 6e1af6d..3935296 100644
--- a/common/kotlinx-coroutines-core-common/src/CoroutineScope.kt
+++ b/common/kotlinx-coroutines-core-common/src/CoroutineScope.kt
@@ -71,7 +71,7 @@
get() = coroutineContext[Job]?.isActive ?: true
/**
- * Returns the context of this scope.
+ * Context of this scope.
*/
public val coroutineContext: CoroutineContext
}
@@ -175,7 +175,7 @@
*/
public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R {
// todo: optimize implementation to a single allocated object
- val owner = ScopeOwnerCoroutine<R>(coroutineContext)
+ val owner = ScopeCoroutine<R>(coroutineContext)
owner.start(CoroutineStart.UNDISPATCHED, owner, block)
owner.join()
if (owner.isCancelled) {
diff --git a/common/kotlinx-coroutines-core-common/src/Deferred.kt b/common/kotlinx-coroutines-core-common/src/Deferred.kt
index 964196b..b4681a5 100644
--- a/common/kotlinx-coroutines-core-common/src/Deferred.kt
+++ b/common/kotlinx-coroutines-core-common/src/Deferred.kt
@@ -9,75 +9,44 @@
import kotlin.coroutines.experimental.*
/**
- * Deferred value is a non-blocking cancellable future.
+ * Deferred value is a non-blocking cancellable future — it is a [Job] that has a result.
*
* It is created with [async][CoroutineScope.async] coroutine builder or via constructor of [CompletableDeferred] class.
* It is in [active][isActive] state while the value is being computed.
*
- * Deferred value has the following states:
- *
- * | **State** | [isActive] | [isCompleted] | [isCompletedExceptionally] | [isCancelled] |
- * | --------------------------------------- | ---------- | ------------- | -------------------------- | ------------- |
- * | _New_ (optional initial state) | `false` | `false` | `false` | `false` |
- * | _Active_ (default initial state) | `true` | `false` | `false` | `false` |
- * | _Completing_ (optional transient state) | `true` | `false` | `false` | `false` |
- * | _Cancelling_ (optional transient state) | `false` | `false` | `false` | `true` |
- * | _Cancelled_ (final state) | `false` | `true` | `true` | `true` |
- * | _Resolved_ (final state) | `false` | `true` | `false` | `false` |
- * | _Failed_ (final state) | `false` | `true` | `true` | `false` |
+ * Deferred value has the same state machine as the [Job] with additional convenience methods to retrieve
+ * successful or failed result of the computation that was carried out. The result of the deferred is
+ * available when it is [completed][isCompleted] and can be retrieved by [await] method, which throws
+ * exception if the deferred had failed.
+ * Note, that a _cancelled_ deferred is also considered to be completed.
+ * The corresponding exception can be retrieved via [getCompletionExceptionOrNull] from a completed instance of deferred.
*
* Usually, a deferred value is created in _active_ state (it is created and started).
* However, [async][CoroutineScope.async] coroutine builder has an optional `start` parameter that creates a deferred value in _new_ state
* when this parameter is set to [CoroutineStart.LAZY].
* Such a deferred can be be made _active_ by invoking [start], [join], or [await].
*
- * A deferred can be _cancelled_ at any time with [cancel] function that forces it to transition to
- * _cancelling_ state immediately. Deferred that is not backed by a coroutine (see [CompletableDeferred]) and does not have
- * [children] becomes _cancelled_ on [cancel] immediately.
- * Otherwise, deferred becomes _cancelled_ when it finishes executing its code and
- * when all its children [complete][isCompleted].
- *
- * ```
- * wait children
- * +-----+ start +--------+ complete +-------------+ finish +-----------+
- * | New | ---------------> | Active | ----------> | Completing | ---+-> | Resolved |
- * +-----+ +--------+ +-------------+ | |(completed)|
- * | | | | +-----------+
- * | cancel | cancel | cancel |
- * V V | | +-----------+
- * +-----------+ finish +------------+ | +-> | Failed |
- * | Cancelled | <--------- | Cancelling | <---------------+ |(completed)|
- * |(completed)| +------------+ +-----------+
- * +-----------+
- * ```
- *
* A deferred value is a [Job]. A job in the
* [coroutineContext](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines.experimental/coroutine-context.html)
* of [async][CoroutineScope.async] builder represents the coroutine itself.
- * A deferred value is active while the coroutine is working and cancellation aborts the coroutine when
- * the coroutine is suspended on a _cancellable_ suspension point by throwing [CancellationException]
- * or the cancellation cause inside the coroutine.
- *
- * A deferred value can have a _parent_ job. A deferred value with a parent is cancelled when its parent is
- * cancelled or completes. Parent waits for all its [children] to complete in _completing_ or
- * _cancelling_ state. _Completing_ state is purely internal. For an outside observer a _completing_
- * deferred is still active, while internally it is waiting for its children.
*
* All functions on this interface and on all interfaces derived from it are **thread-safe** and can
* be safely invoked from concurrent coroutines without external synchronization.
*/
public interface Deferred<out T> : Job {
/**
- * Returns `true` if computation of this deferred value has _completed exceptionally_ -- it had
- * either _failed_ with exception during computation or was [cancelled][cancel].
+ * Returns `true` if computation of this deferred value has _completed exceptionally_.
+ * It is `true` when both [isCompleted] and [isCancelled] are true.
+ * It implies that [isActive] is `false`.
*
- * It implies that [isActive] is `false` and [isCompleted] is `true`.
+ * @suppress **Deprecated**: Use [isCancelled] && [isCompleted]
*/
+ @Deprecated("Use isCancelled && isCompleted", ReplaceWith("this.isCancelled && this.isCompleted"))
public val isCompletedExceptionally: Boolean
/**
* Awaits for completion of this value without blocking a thread and resumes when deferred computation is complete,
- * returning the resulting value or throwing the corresponding exception if the deferred had completed exceptionally or was cancelled.
+ * returning the resulting value or throwing the corresponding exception if the deferred was cancelled.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
@@ -97,8 +66,7 @@
/**
* Returns *completed* result or throws [IllegalStateException] if this deferred value has not
- * [completed][isCompleted] yet. It throws the corresponding exception if this deferred has
- * [completed exceptionally][isCompletedExceptionally].
+ * [completed][isCompleted] yet. It throws the corresponding exception if this deferred was [cancelled][isCancelled].
*
* This function is designed to be used from [invokeOnCompletion] handlers, when there is an absolute certainty that
* the value is already complete. See also [getCompletionExceptionOrNull].
@@ -109,8 +77,8 @@
public fun getCompleted(): T
/**
- * Returns *completion exception* result if this deferred [completed exceptionally][isCompletedExceptionally],
- * `null` if it is completed normally, or throws [IllegalStateException] if this deferred value has not
+ * Returns *completion exception* result if this deferred was [cancelled][isCancelled] and has [completed][isCompleted],
+ * `null` if it had completed normally, or throws [IllegalStateException] if this deferred value has not
* [completed][isCompleted] yet.
*
* This function is designed to be used from [invokeOnCompletion] handlers, when there is an absolute certainty that
diff --git a/common/kotlinx-coroutines-core-common/src/Job.kt b/common/kotlinx-coroutines-core-common/src/Job.kt
index 374b87e..3713c07 100644
--- a/common/kotlinx-coroutines-core-common/src/Job.kt
+++ b/common/kotlinx-coroutines-core-common/src/Job.kt
@@ -14,59 +14,56 @@
// --------------- core job interfaces ---------------
/**
- * A background job. Conceptually, a job is a cancellable thing with a simple life-cycle that
+ * A background job. Conceptually, a job is a cancellable thing with a life-cycle that
* culminates in its completion. Jobs can be arranged into parent-child hierarchies where cancellation
- * or completion of parent immediately cancels all its [children].
+ * of parent lead to an immediate cancellation of all its [children] and vice versa.
*
* The most basic instances of [Job] are created with [launch][CoroutineScope.launch] coroutine builder or with a
- * `Job()` factory function. Other coroutine builders and primitives like
- * [Deferred] also implement [Job] interface.
+ * `Job()` factory function.
+ * Conceptually, an execution of the job does not produce a result value. Jobs are launched solely for their
+ * side-effects. See [Deferred] interface for a job that produces a result.
*
* A job has the following states:
*
- * | **State** | [isActive] | [isCompleted] | [isCancelled] |
- * | --------------------------------------- | ---------- | ------------- | ------------- |
- * | _New_ (optional initial state) | `false` | `false` | `false` |
- * | _Active_ (default initial state) | `true` | `false` | `false` |
- * | _Completing_ (optional transient state) | `true` | `false` | `false` |
- * | _Cancelling_ (optional transient state) | `false` | `false` | `true` |
- * | _Cancelled_ (final state) | `false` | `true` | `true` |
- * | _Completed_ (final state) | `false` | `true` | `false` |
+ * | **State** | [isActive] | [isCompleted] | [isCancelled] |
+ * | -------------------------------- | ---------- | ------------- | ------------- |
+ * | _New_ (optional initial state) | `false` | `false` | `false` |
+ * | _Active_ (default initial state) | `true` | `false` | `false` |
+ * | _Completing_ (transient state) | `true` | `false` | `false` |
+ * | _Cancelling_ (transient state) | `false` | `false` | `true` |
+ * | _Cancelled_ (final state) | `false` | `true` | `true` |
+ * | _Completed_ (final state) | `false` | `true` | `false` |
*
* Usually, a job is created in _active_ state (it is created and started). However, coroutine builders
* that provide an optional `start` parameter create a coroutine in _new_ state when this parameter is set to
* [CoroutineStart.LAZY]. Such a job can be made _active_ by invoking [start] or [join].
*
- * A job can be _cancelled_ at any time with [cancel] function that forces it to transition to
- * _cancelling_ state immediately. Job that is not backed by a coroutine (see `Job()` function) and does not have
- * [children] becomes _cancelled_ on [cancel] immediately.
- * Otherwise, job becomes _cancelled_ when it finishes executing its code and
- * when all its children [complete][isCompleted].
+ * A job is _active_ while the coroutine is working. Failure of the job makes it _cancelling_.
+ * A job can be cancelled it at any time with [cancel] function that forces it to transition to
+ * _cancelling_ state immediately. The job becomes _cancelled_ when it finishes executing it work.
*
* ```
- * wait children
- * +-----+ start +--------+ complete +-------------+ finish +-----------+
- * | New | ---------------> | Active | -----------> | Completing | -------> | Completed |
- * +-----+ +--------+ +-------------+ +-----------+
- * | | |
- * | cancel | cancel | cancel
- * V V |
- * +-----------+ finish +------------+ |
- * | Cancelled | <--------- | Cancelling | <----------------+
- * |(completed)| +------------+
- * +-----------+
+ * wait children
+ * +-----+ start +--------+ complete +-------------+ finish +-----------+
+ * | New | -----> | Active | ---------> | Completing | -------> | Completed |
+ * +-----+ +--------+ +-------------+ +-----------+
+ * | cancel |
+ * | +----------------+
+ * | |
+ * V V
+ * +------------+ finish +-----------+
+ * | Cancelling | --------------------------------> | Cancelled |
+ * +------------+ +-----------+
* ```
*
- * A job in the
+ * A `Job` instance in the
* [coroutineContext](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines.experimental/coroutine-context.html)
* represents the coroutine itself.
- * A job is active while the coroutine is working and job's cancellation aborts the coroutine when
- * the coroutine is suspended on a _cancellable_ suspension point by throwing [CancellationException].
*
- * A job can have a _parent_ job. A job with a parent is cancelled when its parent is cancelled or completes exceptionally.
- * Parent job waits for all its children to complete in _completing_ or _cancelling_ state.
- * _Completing_ state is purely internal to the job. For an outside observer a _completing_ job is still active,
- * while internally it is waiting for its children.
+ * A job can have a _parent_ job. A job with a parent is cancelled when its parent is cancelled.
+ * Parent job waits in _completing_ or _cancelling_ state for all its children to complete before finishing.
+ * Note, that _completing_ state is purely internal to the job. For an outside observer a _completing_ job is still
+ * active, while internally it is waiting for its children.
*
* All functions on this interface and on all interfaces derived from it are **thread-safe** and can
* be safely invoked from concurrent coroutines without external synchronization.
@@ -97,22 +94,25 @@
// ------------ state query ------------
/**
- * Returns `true` when this job is active -- it was already started and has not completed or cancelled yet.
+ * Returns `true` when this job is active -- it was already started and has not completed or failed yet.
* The job that is waiting for its [children] to complete is still considered to be active if it
- * was not cancelled.
+ * has not failed and was not cancelled.
*/
public val isActive: Boolean
/**
- * Returns `true` when this job has completed for any reason. A job that was cancelled and has
- * finished its execution is also considered complete. Job becomes complete only after
+ * Returns `true` when this job has completed for any reason. A job that has failed or cancelled
+ * and has finished its execution is also considered complete. Job becomes complete only after
* all its [children] complete.
*/
public val isCompleted: Boolean
/**
- * Returns `true` if this job was [cancelled][cancel]. In the general case, it does not imply that the
- * job has already [completed][isCompleted] (it may still be cancelling whatever it was doing).
+ * Returns `true` if this job was cancelled for any reason, either by explicit invocation of [cancel] or
+ * because it had failed or its children or parent was cancelled.
+ * In the general case, it does not imply that the
+ * job has already [completed][isCompleted], because it may still be finishing whatever it was doing and
+ * waiting for its [children] to complete.
*/
public val isCancelled: Boolean
@@ -127,8 +127,7 @@
* returned. The [CancellationException.cause] of the resulting [CancellationException] references
* the original cancellation cause that was passed to [cancel] function.
*
- * This function throws [IllegalStateException] when invoked on a job that has not
- * [completed][isCompleted] nor [cancelled][isCancelled] yet.
+ * This function throws [IllegalStateException] when invoked on a job that is still active.
*
* @suppress **This an internal API and should not be used from general code.**
*/
@@ -204,21 +203,23 @@
*
* A parent-child relation has the following effect:
* * Cancellation of parent with [cancel] or its exceptional completion (failure)
- * immediately cancels all its children.
+ * immediately fails all its children.
* * Parent cannot complete until all its children are complete. Parent waits for all its children to
- * complete in _completing_ or _cancelling_ state.
+ * complete in _completing_ or _cancelling_ states.
*
- * **A child must store the resulting [DisposableHandle] and [dispose][DisposableHandle.dispose] the attachment
+ * **A child must store the resulting [ChildHandle] and [dispose][DisposableHandle.dispose] the attachment
* to its parent on its own completion.**
*
* Coroutine builders and job factory functions that accept `parent` [CoroutineContext] parameter
* lookup a [Job] instance in the parent context and use this function to attach themselves as a child.
- * They also store a reference to the resulting [DisposableHandle] and dispose a handle when they complete.
+ * They also store a reference to the resulting [ChildHandle] and dispose a handle when they complete.
*
* @suppress This is an internal API. This method is too error prone for public API.
*/
+ // ChildJob and ChildHandle are made internal on purpose to further deter 3rd-party impl of Job
@InternalCoroutinesApi
- public fun attachChild(child: Job): DisposableHandle
+ @Suppress("EXPOSED_FUNCTION_RETURN_TYPE", "EXPOSED_PARAMETER_TYPE")
+ public fun attachChild(child: ChildJob): ChildHandle
/**
* Cancels all children jobs of this coroutine with the given [cause]. Unlike [cancel],
@@ -268,10 +269,10 @@
public fun invokeOnCompletion(handler: CompletionHandler, onCancelling: Boolean): DisposableHandle
/**
- * @suppress **Deprecated**: Use with named `onCancelling` and `handler` parameters.
+ * @suppress **Deprecated**: Use with named `onCancellation` and `handler` parameters.
*/
- @Deprecated(message = "Use with named `onCancelling` and `handler` parameters", level = DeprecationLevel.WARNING,
- replaceWith = ReplaceWith("this.invokeOnCompletion(onCancelling = onCancelling_, handler = handler)"))
+ @Deprecated(message = "Use with named `onCancellation` and `handler` parameters", level = DeprecationLevel.WARNING,
+ replaceWith = ReplaceWith("this.invokeOnCompletion(onCancellation = onCancelling_, handler = handler)"))
public fun invokeOnCompletion(onCancelling_: Boolean = false, handler: CompletionHandler): DisposableHandle
/**
@@ -299,10 +300,10 @@
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
/**
- * Registers handler that is **synchronously** invoked once on cancellation or completion of this job.
- * When job is already cancelling or complete, then the handler is immediately invoked
+ * Registers handler that is **synchronously** invoked once on failure or completion of this job.
+ * When job is already failing or complete, then the handler is immediately invoked
* with a job's cancellation cause or `null` unless [invokeImmediately] is set to false.
- * Otherwise, handler will be invoked once when this job is cancelled or complete.
+ * Otherwise, handler will be invoked once when this job is failing or is complete.
*
* The meaning of `cause` that is passed to the handler:
* * Cause is `null` when job has completed normally.
@@ -310,12 +311,9 @@
* **It should not be treated as an error**. In particular, it should not be reported to error logs.
* * Otherwise, the job had _failed_.
*
- * Invocation of this handler on a transition to a transient _cancelling_ state
+ * Invocation of this handler on a transition to a _failing_ state
* is controlled by [onCancelling] boolean parameter.
- * The handler is invoked on invocation of [cancel] when
- * job becomes _cancelling_ if [onCancelling] parameter is set to `true`. However,
- * when this [Job] is not backed by a coroutine, like [CompletableDeferred] or [CancellableContinuation]
- * (both of which do not posses a _cancelling_ state), then the value of [onCancelling] parameter is ignored.
+ * The handler is invoked when the job is failing when [onCancelling] parameter is set to `true`.
*
* The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] the
* registration of this handler and release its memory if its invocation is no longer needed.
@@ -330,13 +328,13 @@
* This function should not be used in general application code.
* Implementations of `CompletionHandler` must be fast and _lock-free_.
*
- * @param onCancelling when `true`, then the [handler] is invoked as soon as this job transitions to _cancelling_ state;
+ * @param onCancelling when `true`, then the [handler] is invoked as soon as this job transitions to _failing_ state;
* when `false` then the [handler] is invoked only when it transitions to _completed_ state.
* @param invokeImmediately when `true` and this job is already in the desired state (depending on [onCancelling]),
* then the [handler] is immediately and synchronously invoked and no-op [DisposableHandle] is returned;
* when `false` then no-op [DisposableHandle] is returned, but the [handler] is not invoked.
* @param handler the handler.
- *
+ *
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
@@ -390,6 +388,41 @@
}
}
+// -------------------- Parent-child communication --------------------
+
+/**
+ * A reference that parent receives from its child so that it can report its failure.
+ *
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+internal interface ChildJob : Job {
+ /**
+ * Parent is reporting failure to the child by invoking this method.
+ * Child finds the failure cause using [getCancellationException] of the [parentJob].
+ * This method does nothing is the child is already failing.
+ *
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ public fun parentCancelled(parentJob: Job)
+}
+
+/**
+ * A handle that child keep onto its parent so that it is able to report its failure.
+ *
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+internal interface ChildHandle : DisposableHandle {
+ /**
+ * Child is reporting failure to the parent by invoking this method.
+ * This method is invoked by the child twice. The first time child report its root cause as soon as possible,
+ * so that all its siblings and the parent can start finishing their work asap on failure. The second time
+ * child invokes this method when it had aggregated and determined its final termination cause.
+ *
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ public fun childCancelled(cause: Throwable): Boolean
+}
+
// -------------------- Job extensions --------------------
/**
@@ -532,10 +565,22 @@
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
-public object NonDisposableHandle : DisposableHandle {
- /** Does not do anything. */
+public object NonDisposableHandle : DisposableHandle, ChildHandle {
+ /**
+ * Does not do anything.
+ * @suppress
+ */
override fun dispose() {}
- /** Returns "NonDisposableHandle" string. */
+ /**
+ * Returns `false`.
+ * @suppress
+ */
+ override fun childCancelled(cause: Throwable): Boolean = false
+
+ /**
+ * Returns "NonDisposableHandle" string.
+ * @suppress
+ */
override fun toString(): String = "NonDisposableHandle"
}
diff --git a/common/kotlinx-coroutines-core-common/src/JobSupport.kt b/common/kotlinx-coroutines-core-common/src/JobSupport.kt
index 4a7fdab..66924ed 100644
--- a/common/kotlinx-coroutines-core-common/src/JobSupport.kt
+++ b/common/kotlinx-coroutines-core-common/src/JobSupport.kt
@@ -12,7 +12,6 @@
/**
* A concrete implementation of [Job]. It is optionally a child to a parent job.
- * This job is cancelled when the parent is complete, but not vise-versa.
*
* This is an open class designed for extension by more specific classes that might augment the
* state and mare store addition state information for completed jobs, like their result values.
@@ -20,48 +19,47 @@
* @param active when `true` the job is created in _active_ state, when `false` in _new_ state. See [Job] for details.
* @suppress **This is unstable API and it is subject to change.**
*/
-internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0 {
+internal open class JobSupport constructor(active: Boolean) : Job, ChildJob, SelectClause0 {
final override val key: CoroutineContext.Key<*> get() = Job
/*
=== Internal states ===
- name state class public state description
- ------ ------------ ------------ -----------
- EMPTY_N EmptyNew : New no listeners
- EMPTY_A EmptyActive : Active no listeners
- SINGLE JobNode : Active a single listener
- SINGLE+ JobNode : Active a single listener + NodeList added as its next
- LIST_N NodeList : New a list of listeners (promoted once, does not got back to EmptyNew)
- LIST_A NodeList : Active a list of listeners (promoted once, does not got back to JobNode/EmptyActive)
- COMPLETING Finishing : Completing has a list of listeners (promoted once from LIST_*)
- CANCELLING Finishing : Cancelling has a list of listeners (promoted once from LIST_*)
- FINAL_C Cancelled : Cancelled cancelled (final state)
- FINAL_F Failed : Completed failed for other reason (final state)
- FINAL_R <any> : Completed produced some result
+ name state class public state description
+ ------ ------------ ------------ -----------
+ EMPTY_N EmptyNew : New no listeners
+ EMPTY_A EmptyActive : Active no listeners
+ SINGLE JobNode : Active a single listener
+ SINGLE+ JobNode : Active a single listener + NodeList added as its next
+ LIST_N InactiveNodeList : New a list of listeners (promoted once, does not got back to EmptyNew)
+ LIST_A NodeList : Active a list of listeners (promoted once, does not got back to JobNode/EmptyActive)
+ COMPLETING Finishing : Completing has a list of listeners (promoted once from LIST_*)
+ CANCELLING Finishing : Cancelling -- " --
+ FINAL_C Cancelled : Cancelled Cancelled (final state)
+ FINAL_R <any> : Completed produced some result
=== Transitions ===
New states Active states Inactive states
+---------+ +---------+ }
- | EMPTY_N | --+-> | EMPTY_A | ----+ } Empty states
- +---------+ | +---------+ | }
- | | | ^ | +----------+
- | | | | +--> | FINAL_* |
- | | V | | +----------+
- | | +---------+ | }
- | | | SINGLE | ----+ } JobNode states
- | | +---------+ | }
- | | | | }
- | | V | }
- | | +---------+ | }
- | +-- | SINGLE+ | ----+ }
+ | EMPTY_N | ----> | EMPTY_A | ----+ } Empty states
+ +---------+ +---------+ | }
+ | | | ^ | +----------+
+ | | | | +--> | FINAL_* |
+ | | V | | +----------+
+ | | +---------+ | }
+ | | | SINGLE | ----+ } JobNode states
+ | | +---------+ | }
+ | | | | }
+ | | V | }
+ | | +---------+ | }
+ | +-------> | SINGLE+ | ----+ }
| +---------+ | }
| | |
V V |
+---------+ +---------+ | }
- | LIST_N | ----> | LIST_A | ----+ } NodeList states
+ | LIST_N | ----> | LIST_A | ----+ } [Inactive]NodeList states
+---------+ +---------+ | }
| | | | |
| | +--------+ | |
@@ -75,16 +73,57 @@
This state machine and its transition matrix are optimized for the common case when job is created in active
- state (EMPTY_A) and at most one completion listener is added to it during its life-time.
+ state (EMPTY_A), at most one completion listener is added to it during its life-time, and it completes
+ successfully without children (in this case it directly goes from EMPTY_A or SINGLE state to FINAL_R
+ state without going to COMPLETING state)
Note, that the actual `_state` variable can also be a reference to atomic operation descriptor `OpDescriptor`
+
+ ---------- TIMELINE of state changes and notification in Job lifecycle ----------
+
+ | The longest possible chain of events in shown, shorter versions cut-through intermediate states,
+ | while still performing all the notifications in this order.
+
+ + Job object is created
+ ## NEW: state == EMPTY_ACTIVE | is InactiveNodeList
+ + initParentJob / initParentJobInternal (invokes attachChild on its parent, initializes parentHandle)
+ ~ waits for start
+ >> start / join / await invoked
+ ## ACTIVE: state == EMPTY_ACTIVE | is JobNode | is NodeList
+ + onStartInternal / onStart (lazy coroutine is started)
+ ~ active coroutine is working (or scheduled to execution)
+ >> childCancelled / cancelImpl invoked
+ ## CANCELLING: state is Finishing, state.rootCause != null
+ ------ cancelling listeners are not admitted anymore, invokeOnCompletion(onCancellation=true) returns NonDisposableHandle
+ ------ new children get immediately cancelled, but are still admitted to the list
+ + onCancellation
+ + notifyCancelling (invoke all cancelling listeners -- cancel all children, suspended functions resume with exception)
+ + cancelParent (rootCause of cancellation is communicated to the parent, parent is cancelled, too)
+ ~ waits for completion of coroutine body
+ >> makeCompleting / makeCompletingOnce invoked
+ ## COMPLETING: state is Finishing, state.isCompleting == true
+ ------ new children are not admitted anymore, attachChild returns NonDisposableHandle
+ ~ waits for children
+ >> last child completes
+ - computes the final exception
+ ## SEALED: state is Finishing, state.isSealed == true
+ ------ cancel/childCancelled returns false (cannot handle exceptions anymore)
+ + cancelParent (final exception is communicated to the parent, parent incorporates it)
+ + handleJobException ("launch" StandaloneCoroutine invokes CoroutineExceptionHandler)
+ ## COMPLETE: state !is Incomplete (CompletedExceptionally | Cancelled)
+ ------ completion listeners are not admitted anymore, invokeOnCompletion returns NonDisposableHandle
+ + parentHandle.dispose
+ + notifyCompletion (invoke all completion listeners)
+ + onCompletionInternal / onCompleted / onCompletedExceptionally
+
+ ---------------------------------------------------------------------------------
*/
// Note: use shared objects while we have no listeners
private val _state = atomic<Any?>(if (active) EMPTY_ACTIVE else EMPTY_NEW)
@Volatile
- private var parentHandle: DisposableHandle? = null
+ private var parentHandle: ChildHandle? = null
// ------------ initialization ------------
@@ -103,7 +142,7 @@
@Suppress("DEPRECATION")
val handle = parent.attachChild(this)
parentHandle = handle
- // now check our state _after_ registering (see tryFinalizeStateActually order of actions)
+ // now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
if (isCompleted) {
handle.dispose()
parentHandle = NonDisposableHandle // release it just in case, to aid GC
@@ -141,61 +180,55 @@
public final override val isCancelled: Boolean get() {
val state = this.state
- return state is Cancelled || (state is Finishing && state.cancelled != null)
+ return state is CompletedExceptionally || (state is Finishing && state.isCancelling)
}
// ------------ state update ------------
- /**
- * Updates current [state] of this job to the final state, invoking all necessary handlers
- * and/or `on*` methods.
- *
- * Returns `false` if current state is not equal to expected.
- * If this method succeeds, state of this job will never be changed again
- */
- private fun tryFinalizeState(expect: Incomplete, proposedUpdate: Any?, mode: Int): Boolean =
- if (expect is Finishing && expect.cancelled != null) {
- tryFinalizeCancellingState(expect, proposedUpdate, mode)
- } else {
- val update = coerceProposedUpdate(expect, proposedUpdate)
- tryFinalizeStateActually(expect, update, mode)
+ // Finalizes Finishing -> Completed (terminal state) transition.
+ // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
+ private fun tryFinalizeFinishingState(state: Finishing, proposedUpdate: Any?, mode: Int): Boolean {
+ require(proposedUpdate !is Incomplete) // only incomplete -> completed transition is allowed
+ require(this.state === state) // consistency check -- it cannot change
+ require(!state.isSealed) // consistency check -- cannot be sealed yet
+ require(state.isCompleting) // consistency check -- must be marked as completing
+ val proposedException = (proposedUpdate as? CompletedExceptionally)?.cause
+ // Create the final exception and seal the state so that no more exceptions can be added
+ var suppressed = false
+ val finalException = synchronized(state) {
+ val exceptions = state.sealLocked(proposedException)
+ val rootCause = getFinalRootCause(state, exceptions)
+ if (rootCause != null) suppressed = suppressExceptions(rootCause, exceptions)
+ rootCause
+ }
+ // Create the final state object
+ val finalState = when {
+ // if we have not failed -> use proposed update value
+ finalException == null -> proposedUpdate
+ // small optimization when we can used proposeUpdate object as is on failure
+ finalException === proposedException -> proposedUpdate
+ // cancelled job final state
+ else -> CompletedExceptionally(finalException)
}
- // Finalizes Cancelling -> Cancelled transition
- private fun tryFinalizeCancellingState(expect: Finishing, proposedUpdate: Any?, mode: Int): Boolean {
- /*
- * If job is in 'cancelling' state and we're finalizing job state, we start intricate dance:
- * 1) Synchronize on state to avoid races with concurrent
- * mutations (e.g. when new child is added)
- * 2) After synchronization check we're still in the expected state
- * 3) Aggregate final exception under the same lock which protects exceptions
- * collection
- * 4) Pass it upstream
- */
- val finalException = synchronized(expect) {
- if (_state.value !== expect) {
- return false
- }
- if (proposedUpdate is CompletedExceptionally) {
- expect.addExceptionLocked(proposedUpdate.cause)
- }
- /*
- * Note that new exceptions cannot be added concurrently: state is guarded by lock
- * and storage is sealed in the end, so all new exceptions will be reported separately
- */
- buildException(expect).also { expect.seal() }
+ // Now handle exception if parent can't handle it
+ if (finalException != null && !cancelParent(finalException)) {
+ handleJobException(finalException)
}
- val update = Cancelled(this, finalException ?: expect.cancelled!!.cause)
- if (tryFinalizeStateActually(expect, update, mode)) return true
- // ^^^^ this CAS never fails: we're in the state when no jobs can be attached, because state is already sealed
- val error = AssertionError("Unexpected state: ${_state.value}, expected: $expect, update: $update")
- handleOnCompletionException(error)
- throw error
+ // Then CAS to completed state -> it must succeed
+ require(_state.compareAndSet(state, finalState)) { "Unexpected state: ${_state.value}, expected: $state, update: $finalState" }
+ // And process all post-completion actions
+ completeStateFinalization(state, finalState, mode, suppressed)
+ return true
}
- private fun buildException(state: Finishing): Throwable? {
- val cancelled = state.cancelled!!
- val suppressed = state.exceptions
+ private fun getFinalRootCause(state: Finishing, exceptions: List<Throwable>): Throwable? {
+ // A case of no exceptions
+ if (exceptions.isEmpty()) {
+ // materialize cancellation exception if it was not materialized yet
+ if (state.isCancelling) return createJobCancellationException()
+ return null
+ }
/*
* This is a place where we step on our API limitation:
* We can't distinguish internal JobCancellationException from our parent
@@ -203,39 +236,44 @@
*
* But it has negative consequences: same exception can be added as suppressed more than once.
* Consider concurrent parent-child relationship:
- * 1) Child throws E1 and parent throws E2
- * 2) Parent goes to "Cancelling(E1)" and cancels child with E1
+ * 1) Child throws E1 and parent throws E2.
+ * 2) Parent goes to "Failing(E1)" and cancels child with E1
* 3) Child goes to "Cancelling(E1)", but throws an exception E2
- * 4) When child throws, it notifies parent that he is cancelling, adding its exception to parent list of exceptions
- * (again, parent don't know whether it's child exception or external exception)
- * 5) Child builds final exception: E1 with suppressed E2, reports it to parent
+ * 4) When child throws, it notifies parent that it is failing, adding its exception to parent's list of exceptions/
+ * 5) Child builds final exception: E1 with suppressed E2, reports it to parent.
* 6) Parent aggregates three exceptions: original E1, reported E2 and "final" E1.
* It filters the third exception, but adds the second one to the first one, thus adding suppressed duplicate.
*
- * Note that it's only happening when both parent and child throw exception simultaneously
+ * Note that it's only happening when both parent and child throw exception simultaneously.
*/
- var rootCause = cancelled.cause
+ var rootCause = exceptions[0]
if (rootCause is JobCancellationException) {
val cause = unwrap(rootCause)
rootCause = if (cause !== null) {
cause
} else {
- suppressed.firstOrNull { unwrap(it) != null } ?: return rootCause
- }
- }
- // TODO it should be identity set and optimized for small footprints
- val seenExceptions = HashSet<Throwable>(suppressed.size)
- suppressed.forEach {
- val unwrapped = unwrap(it)
- if (unwrapped !== null && unwrapped !== rootCause) {
- if (seenExceptions.add(unwrapped)) {
- rootCause.addSuppressedThrowable(unwrapped)
- }
+ exceptions.firstOrNull { unwrap(it) != null } ?: return rootCause
}
}
return rootCause
}
+ private fun suppressExceptions(rootCause: Throwable, exceptions: List<Throwable>): Boolean {
+ if (exceptions.size <= 1) return false // nothing more to do here
+ val seenExceptions = identitySet<Throwable>(exceptions.size)
+ var suppressed = false
+ for (i in 1 until exceptions.size) {
+ val unwrapped = unwrap(exceptions[i])
+ if (unwrapped !== null && unwrapped !== rootCause) {
+ if (seenExceptions.add(unwrapped)) {
+ rootCause.addSuppressedThrowable(unwrapped)
+ suppressed = true
+ }
+ }
+ }
+ return suppressed
+ }
+
private tailrec fun unwrap(exception: Throwable): Throwable? =
if (exception is JobCancellationException) {
val cause = exception.cause
@@ -244,29 +282,17 @@
exception
}
- /**
- * Tries to actually update [state] of this job to the final state and, if
- * succeeds, disposes parent handle (detaching child from parent), and
- * invokes all the handlers to notify on the final state transition.
- */
- private fun tryFinalizeStateActually(expect: Incomplete, update: Any?, mode: Int): Boolean {
- require(update !is Incomplete) // only incomplete -> completed transition is allowed
-
- /*
- * We're publishing CompletedExceptionally as OpDescriptor to avoid races with parent:
- * Job can't report exception before CAS (as it can fail), but after CAS there is a small window
- * where the parent is considering this job (child) completed, though child has not yet reported its exception.
- */
- val updateValue = if (update is CompletedExceptionally) HandleExceptionOp(update) else update
- if (!_state.compareAndSet(expect, updateValue)) return false // failed
- if (updateValue is HandleExceptionOp) {
- updateValue.perform(this) // help perform
- }
- completeStateFinalization(expect, update, mode)
+ // fast-path method to finalize normally completed coroutines without children
+ private fun tryFinalizeSimpleState(state: Incomplete, update: Any?, mode: Int): Boolean {
+ check(state is Empty || state is JobNode<*>) // only simple state without lists where children can concurrently add
+ check(update !is CompletedExceptionally) // only for normal completion
+ if (!_state.compareAndSet(state, update)) return false
+ completeStateFinalization(state, update, mode, false)
return true
}
- private fun completeStateFinalization(expect: Incomplete, update: Any?, mode: Int) {
+ // suppressed == true when any exceptions were suppressed while building the final completion cause
+ private fun completeStateFinalization(state: Incomplete, update: Any?, mode: Int, suppressed: Boolean) {
/*
* Now the job in THE FINAL state. We need to properly handle the resulting state.
* Order of various invocations here is important.
@@ -277,68 +303,46 @@
it.dispose() // volatile read parentHandle _after_ state was updated
parentHandle = NonDisposableHandle // release it just in case, to aid GC
}
- val exceptionally = update as? CompletedExceptionally
+ val cause = (update as? CompletedExceptionally)?.cause
/*
- * 2) Invoke onCancellationInternal: exception handling, resource cancellation etc.
- * Only notify on cancellation once (expect.isCancelling)
+ * 2) Invoke onCancellation: for resource cancellation resource cancellation etc.
+ * Only notify is was not notified yet.
+ * Note: we do not use notifyCancelling here, since we are going to invoke all completion as our next step
*/
- if (!expect.isCancelling) {
- onCancellationInternal(exceptionally)
- }
+ if (!state.isCancelling) onCancellation(cause)
/*
* 3) Invoke completion handlers: .join(), callbacks etc.
* It's important to invoke them only AFTER exception handling, see #208
*/
- val cause = exceptionally?.cause
- if (expect is JobNode<*>) { // SINGLE/SINGLE+ state -- one completion handler (common case)
+ if (state is JobNode<*>) { // SINGLE/SINGLE+ state -- one completion handler (common case)
try {
- expect.invoke(cause)
+ state.invoke(cause)
} catch (ex: Throwable) {
- handleOnCompletionException(CompletionHandlerException("Exception in completion handler $expect for $this", ex))
+ handleOnCompletionException(CompletionHandlerException("Exception in completion handler $state for $this", ex))
}
} else {
- expect.list?.notifyCompletion(cause)
+ state.list?.notifyCompletion(cause)
}
/*
- * 4) Invoke onCompletionInternal: onNext(), timeout deregistration etc.
+ * 4) Invoke onCompletionInternal: onNext(), timeout de-registration etc.
* It should be last so all callbacks observe consistent state
* of the job which doesn't depend on callback scheduling.
*/
- onCompletionInternal(update, mode)
+ onCompletionInternal(update, mode, suppressed)
}
- // when Job is in Cancelling state, it can only be promoted to Cancelled state,
- // so if the proposed Update is not an appropriate Cancelled (preserving the cancellation cause),
- // then the corresponding Cancelled state is constructed.
- private fun coerceProposedUpdate(expect: Incomplete, proposedUpdate: Any?): Any? =
- if (expect is Finishing && expect.cancelled != null && !isCorrespondinglyCancelled(expect.cancelled, proposedUpdate))
- createCancelled(expect.cancelled, proposedUpdate) else proposedUpdate
-
- private fun isCorrespondinglyCancelled(cancelled: Cancelled, proposedUpdate: Any?): Boolean {
- if (proposedUpdate !is Cancelled) return false
- // NOTE: equality comparison of causes is performed here by design, see equals of JobCancellationException
- return proposedUpdate.cause == cancelled.cause || proposedUpdate.cause is JobCancellationException
- }
-
- private fun createCancelled(cancelled: Cancelled, proposedUpdate: Any?): Cancelled {
- if (proposedUpdate !is CompletedExceptionally) return cancelled // not exception -- just use original cancelled
- val exception = proposedUpdate.cause
- if (cancelled.cause == exception) return cancelled // that is the cancelled we need already!
- // That could have occurred while coroutine is being cancelled.
- // Do not spam with JCE in suppressed exceptions
- if (cancelled.cause !is JobCancellationException) {
- exception.addSuppressedThrowable(cancelled.cause)
- }
- return Cancelled(this, exception)
+ private fun notifyCancelling(list: NodeList, cause: Throwable) {
+ // first cancel our own children
+ onCancellation(cause)
+ notifyHandlers<JobCancellingNode<*>>(list, cause)
+ // then report to the parent that we are failing
+ cancelParent(cause) // tentative failure report -- does not matter if there is no parent
}
private fun NodeList.notifyCompletion(cause: Throwable?) =
notifyHandlers<JobNode<*>>(this, cause)
- private fun notifyCancellation(list: NodeList, cause: Throwable?) =
- notifyHandlers<JobCancellationNode<*>>(list, cause)
-
- private inline fun <reified T: JobNode<*>> notifyHandlers(list: NodeList, cause: Throwable?) {
+ private inline fun <reified T: JobNode<*>> notifyHandlers(list: NodeList, cause: Throwable?) {
var exception: Throwable? = null
list.forEach<T> { node ->
try {
@@ -390,13 +394,11 @@
public final override fun getCancellationException(): CancellationException {
val state = this.state
- return when {
- state is Finishing && state.cancelled != null ->
- state.cancelled.cause.toCancellationException("Job is being cancelled")
- state is Incomplete ->
- error("Job was not completed or cancelled yet: $this")
- state is CompletedExceptionally ->
- state.cause.toCancellationException("Job has failed")
+ return when (state) {
+ is Finishing -> state.rootCause?.toCancellationException("Job is failing")
+ ?: error("Job is still new or active: $this")
+ is Incomplete -> error("Job is still new or active: $this")
+ is CompletedExceptionally -> state.cause.toCancellationException("Job has failed")
else -> JobCancellationException("Job has completed normally", null, this)
}
}
@@ -408,14 +410,16 @@
* Returns the cause that signals the completion of this job -- it returns the original
* [cancel] cause, [CancellationException] or **`null` if this job had completed normally**.
* This function throws [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor
- * [isCancelled] yet.
+ * failing yet.
+ *
+ * @suppress **This is unstable API and it is subject to change.**
*/
- protected fun getCompletionCause(): Throwable? {
- val state = this.state
- return when {
- state is Finishing && state.cancelled != null -> state.cancelled.cause
- state is Incomplete -> error("Job was not completed or cancelled yet")
- state is CompletedExceptionally -> state.cause
+ protected fun getCompletionCause(): Throwable? = loopOnState { state ->
+ return when (state) {
+ is Finishing -> state.rootCause
+ ?: error("Job is still new or active: $this")
+ is Incomplete -> error("Job is still new or active: $this")
+ is CompletedExceptionally -> state.cause
else -> null
}
}
@@ -454,13 +458,33 @@
if (list == null) { // SINGLE/SINGLE+
promoteSingleToNodeList(state as JobNode<*>)
} else {
- if (state is Finishing && state.cancelled != null && onCancelling) {
- // installing cancellation handler on job that is being cancelled
- if (invokeImmediately) handler.invokeIt(state.cancelled.cause)
- return NonDisposableHandle
+ var rootCause: Throwable? = null
+ var handle: DisposableHandle = NonDisposableHandle
+ if (onCancelling && state is Finishing) {
+ synchronized(state) {
+ // check if we are installing failing handler on job that is failing
+ rootCause = state.rootCause // != null if we are failing
+ // We add node to the list in two cases --- either the job is not failing
+ // or we are adding a child to a coroutine that is not completing yet
+ if (rootCause == null || handler.isHandlerOf<ChildHandleImpl>() && !state.isCompleting) {
+ // Note: add node the list while holding lock on state (make sure it cannot change)
+ val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
+ if (!addLastAtomic(state, list, node)) return@loopOnState // retry
+ // just return node if we don't have to invoke handler (not failing yet)
+ if (rootCause == null) return node
+ // otherwise handler is invoked immediately out of the synchronized section & handle returned
+ handle = node
+ }
+ }
}
- val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
- if (addLastAtomic(state, list, node)) return node
+ if (rootCause != null) {
+ // Note: attachChild uses invokeImmediately, so it gets invoked when adding to failing job
+ if (invokeImmediately) handler.invokeIt(rootCause)
+ return handle
+ } else {
+ val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
+ if (addLastAtomic(state, list, node)) return node
+ }
}
}
else -> { // is complete
@@ -475,10 +499,10 @@
private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode<*> {
return if (onCancelling)
- (handler as? JobCancellationNode<*>)?.also { require(it.job === this) }
- ?: InvokeOnCancellation(this, handler)
+ (handler as? JobCancellingNode<*>)?.also { require(it.job === this) }
+ ?: InvokeOnCancelling(this, handler)
else
- (handler as? JobNode<*>)?.also { require(it.job === this && it !is JobCancellationNode) }
+ (handler as? JobNode<*>)?.also { require(it.job === this && it !is JobCancellingNode) }
?: InvokeOnCompletion(this, handler)
}
@@ -568,98 +592,156 @@
}
/**
+ * Returns `true` for job that do not have "body block" to complete and should immediately go into
+ * completing state and start waiting for children.
+ *
* @suppress **This is unstable API and it is subject to change.**
*/
- internal open val onCancelMode: Int get() = ON_CANCEL_MAKE_CANCELLING
+ internal open val onCancelComplete: Boolean get() = false
- public override fun cancel(): Boolean = cancel(null)
+ // external cancel without cause, never invoked implicitly from internal machinery
+ public override fun cancel(): Boolean =
+ cancel(null) // must delegate here, because some classes override cancel(x)
- public override fun cancel(cause: Throwable?): Boolean = when (onCancelMode) {
- ON_CANCEL_MAKE_CANCELLING -> makeCancelling(cause)
- ON_CANCEL_MAKE_COMPLETING -> makeCompleting(Cancelled(this, cause))
- else -> error("Invalid onCancelMode $onCancelMode")
+ // external cancel with (optional) cause, never invoked implicitly from internal machinery
+ public override fun cancel(cause: Throwable?): Boolean =
+ cancelImpl(cause) && handlesException
+
+ // parent is reporting failure to a child child
+ public final override fun parentCancelled(parentJob: Job) {
+ cancelImpl(parentJob)
}
- // we will be dispatching coroutine to process its cancellation exception, so there is no need for
- // an extra check for Job status in MODE_CANCELLABLE
- private fun updateStateCancelled(state: Incomplete, cause: Throwable?) =
- tryFinalizeState(state, Cancelled(this, cause), mode = MODE_ATOMIC_DEFAULT)
+ // child was cancelled with cause
+ internal fun childCancelled(cause: Throwable): Boolean =
+ cancelImpl(cause) && handlesException
- // transitions to Cancelling state
- private fun makeCancelling(cause: Throwable?): Boolean {
+ // cause is Throwable or Job when cancelChild was invoked
+ // returns true is exception was handled, false otherwise
+ private fun cancelImpl(cause: Any?): Boolean {
+ if (onCancelComplete) {
+ // make sure it is completing, if cancelMakeCompleting returns true it means it had make it
+ // completing and had recorded exception
+ if (cancelMakeCompleting(cause)) return true
+ // otherwise just record failure via makeCancelling below
+ }
+ return makeCancelling(cause)
+ }
+
+ private fun cancelMakeCompleting(cause: Any?): Boolean {
loopOnState { state ->
- when (state) {
- is Empty -> { // EMPTY_X state -- no completion handlers
- if (state.isActive) {
- promoteEmptyToNodeList(state) // this way can wrap it into Cancelling on next pass
- } else {
- // cancelling a non-started coroutine makes it immediately cancelled
- // (and we have no listeners to notify which makes it very simple)
- if (updateStateCancelled(state, cause)) return true
- }
- }
- is JobNode<*> -> { // SINGLE/SINGLE+ state -- one completion handler
- promoteSingleToNodeList(state)
- }
- is NodeList -> { // LIST -- active list of completion handlers
- if (tryMakeCancelling(state, state.list, cause)) return true
- }
- is InactiveNodeList -> { // LIST -- inactive list of completion handlers
- // cancelling a non-started coroutine makes it immediately cancelled
- if (updateStateCancelled(state, cause))
- return true
- }
- is Finishing -> { // Completing/Cancelling the job, may cancel
- if (state.cancelled != null) {
- if (cause == null) {
- return true
- }
- /*
- * If we failed to add an exception, then `seal` was successfully called
- * and `seal` is called only after state update => retry is liveness-safe
- */
- if (state.addException(cause)) {
- return true
- } else {
- return@loopOnState
- }
- }
- if (tryMakeCancelling(state, state.list, cause)) return true
- }
- /*
- * Filter out duplicates due to race in the following pattern:
- * T1: parent -> completion sequence
- * T2: child -> set final state -> signal with final exception to the parent
- */
- is CompletedExceptionally -> return state.cause === cause
- else -> { // is inactive
- return false
- }
+ if (state !is Incomplete || state is Finishing && state.isCompleting) {
+ return false // already completed/completing, do not even propose update
+ }
+ val proposedUpdate = CompletedExceptionally(createCauseException(cause))
+ when (tryMakeCompleting(state, proposedUpdate, mode = MODE_ATOMIC_DEFAULT)) {
+ COMPLETING_ALREADY_COMPLETING -> return false
+ COMPLETING_COMPLETED, COMPLETING_WAITING_CHILDREN -> return true
+ COMPLETING_RETRY -> return@loopOnState
+ else -> error("unexpected result")
}
}
}
- // try make expected state in cancelling on the condition that we're still in this state
- private fun tryMakeCancelling(expect: Incomplete, list: NodeList, cause: Throwable?): Boolean {
- val cancelled = Cancelled(this, cause)
- if (!_state.compareAndSet(expect, Finishing(list, cancelled, false))) return false
- onFinishingInternal(cancelled)
- onCancellationInternal(cancelled)
- // Materialize cause
- notifyCancellation(list, cancelled.cause)
+ private fun createJobCancellationException() =
+ JobCancellationException("Job was cancelled", null, this)
+
+ // cause is Throwable or Job when cancelChild was invoked, cause can be null only on cancel
+ private fun createCauseException(cause: Any?): Throwable = when(cause) {
+ is Throwable? -> cause ?: createJobCancellationException()
+ else -> (cause as Job).getCancellationException()
+ }
+
+ // transitions to Failing state
+ // cause is Throwable or Job when cancelChild was invoked, cause can be null only on cancel
+ private fun makeCancelling(cause: Any?): Boolean {
+ var causeExceptionCache: Throwable? = null // lazily init result of createCauseException(cause)
+ loopOnState { state ->
+ when (state) {
+ is Finishing -> { // already finishing -- collect exceptions
+ val notifyRootCause = synchronized(state) {
+ if (state.isSealed) return false // too late, already sealed -- cannot add exception nor mark cancelled
+ // add exception, do nothing is parent is cancelling child that is already being cancelled
+ val wasCancelling = state.isCancelling // will notify if was not failing
+ // Materialize missing exception if it is the first exception (otherwise -- don't)
+ if (cause != null || !wasCancelling) {
+ val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
+ state.addExceptionLocked(causeException)
+ }
+ // take cause for notification is was not cancelling before
+ state.rootCause.takeIf { !wasCancelling }
+ }
+ notifyRootCause?.let { notifyCancelling(state.list, it) }
+ return true
+ }
+ is Incomplete -> {
+ // Not yet finishing -- try to make it failing
+ val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
+ if (state.isActive) {
+ // active state becomes failing
+ if (tryMakeFailing(state, causeException)) return true
+ } else {
+ // non active state starts completing
+ when (tryMakeCompleting(state, CompletedExceptionally(causeException), mode = MODE_ATOMIC_DEFAULT)) {
+ COMPLETING_ALREADY_COMPLETING -> error("Cannot happen in $state")
+ COMPLETING_COMPLETED, COMPLETING_WAITING_CHILDREN -> return true // ok
+ COMPLETING_RETRY -> return@loopOnState
+ else -> error("unexpected result")
+ }
+ }
+ }
+ else -> return false // already complete
+ }
+ }
+ }
+
+ // Performs promotion of incomplete coroutine state to NodeList for the purpose of
+ // converting coroutine state to Failing, returns null when need to retry
+ private fun getOrPromoteFailingList(state: Incomplete): NodeList? = state.list ?:
+ when (state) {
+ is Empty -> NodeList() // we can allocate new empty list that'll get integrated into Failing state
+ is JobNode<*> -> {
+ // SINGLE/SINGLE+ must be promoted to NodeList first, because otherwise we cannot
+ // correctly capture a reference to it
+ promoteSingleToNodeList(state)
+ null // retry
+ }
+ else -> error("State should have list: $state")
+ }
+
+ // try make new failing state on the condition that we're still in the expected state
+ private fun tryMakeFailing(state: Incomplete, rootCause: Throwable): Boolean {
+ check(state !is Finishing) // only for non-finishing states
+ check(state.isActive) // only for active states
+ // get state's list or else promote to list to correctly operate on child lists
+ val list = getOrPromoteFailingList(state) ?: return false
+ // Create failing state (with rootCause!)
+ val failing = Finishing(list, false, rootCause)
+ if (!_state.compareAndSet(state, failing)) return false
+ // Notify listeners
+ notifyCancelling(list, rootCause)
return true
}
/**
+ * This function is used by [CompletableDeferred.complete] (and exceptionally) and by [JobImpl.cancel].
+ * It returns `false` on repeated invocation (when this job is already completing).
+ *
* @suppress **This is unstable API and it is subject to change.**
*/
- internal fun makeCompleting(proposedUpdate: Any?): Boolean =
- when (makeCompletingInternal(proposedUpdate, mode = MODE_ATOMIC_DEFAULT)) {
- COMPLETING_ALREADY_COMPLETING -> false
- else -> true
+ internal fun makeCompleting(proposedUpdate: Any?): Boolean = loopOnState { state ->
+ when (tryMakeCompleting(state, proposedUpdate, mode = MODE_ATOMIC_DEFAULT)) {
+ COMPLETING_ALREADY_COMPLETING -> return false
+ COMPLETING_COMPLETED, COMPLETING_WAITING_CHILDREN -> return true
+ COMPLETING_RETRY -> return@loopOnState
+ else -> error("unexpected result")
}
+ }
/**
+ * This function is used by [AbstractCoroutine.resume].
+ * It throws exception on repeated invocation (when this job is already completing).
+ *
* Returns:
* * `true` if state was updated to completed/cancelled;
* * `false` if made completing or it is cancelling and is waiting for children.
@@ -667,98 +749,106 @@
* @throws IllegalStateException if job is already complete or completing
* @suppress **This is unstable API and it is subject to change.**
*/
- internal fun makeCompletingOnce(proposedUpdate: Any?, mode: Int): Boolean =
- when (makeCompletingInternal(proposedUpdate, mode)) {
- COMPLETING_COMPLETED -> true
- COMPLETING_WAITING_CHILDREN -> false
- else -> throw IllegalStateException("Job $this is already complete or completing, " +
+ internal fun makeCompletingOnce(proposedUpdate: Any?, mode: Int): Boolean = loopOnState { state ->
+ when (tryMakeCompleting(state, proposedUpdate, mode)) {
+ COMPLETING_ALREADY_COMPLETING -> throw IllegalStateException("Job $this is already complete or completing, " +
"but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull)
- }
-
- private fun makeCompletingInternal(proposedUpdate: Any?, mode: Int): Int {
- loopOnState { state ->
- if (state !is Incomplete)
- return COMPLETING_ALREADY_COMPLETING
- if (state is Finishing && state.completing)
- return COMPLETING_ALREADY_COMPLETING
- val child: ChildJob? = firstChild(state) ?: // or else complete immediately w/o children
- when {
- state !is Finishing && hasOnFinishingHandler(proposedUpdate) -> null // unless it has onFinishing handler
- tryFinalizeState(state, proposedUpdate, mode) -> return COMPLETING_COMPLETED
- else -> return@loopOnState
- }
- val list = state.list ?: // must promote to list to correctly operate on child lists
- when (state) {
- is Empty -> {
- promoteEmptyToNodeList(state)
- return@loopOnState // retry
- }
- is JobNode<*> -> {
- promoteSingleToNodeList(state)
- return@loopOnState // retry
- }
- else -> error("Unexpected state with an empty list: $state")
- }
- // cancel all children in list on exceptional completion
- if (proposedUpdate is CompletedExceptionally)
- child?.cancelChildrenInternal(proposedUpdate.cause)
- // switch to completing state
- val cancelled = (state as? Finishing)?.cancelled ?: (proposedUpdate as? Cancelled)
- val completing = Finishing(list, cancelled, true)
- if (_state.compareAndSet(state, completing)) {
- (state as? Finishing)?.transferExceptions(completing)
- if (state !is Finishing) onFinishingInternal(proposedUpdate)
- if (child != null && tryWaitForChild(child, proposedUpdate))
- return COMPLETING_WAITING_CHILDREN
- if (tryFinalizeState(completing, proposedUpdate, mode))
- return COMPLETING_COMPLETED
- }
+ COMPLETING_COMPLETED -> return true
+ COMPLETING_WAITING_CHILDREN -> return false
+ COMPLETING_RETRY -> return@loopOnState
+ else -> error("unexpected result")
}
}
- private tailrec fun ChildJob.cancelChildrenInternal(cause: Throwable) {
- childJob.cancel(JobCancellationException("Child job was cancelled because of parent failure", cause, childJob))
- nextChild()?.cancelChildrenInternal(cause)
+ private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?, mode: Int): Int {
+ if (state !is Incomplete)
+ return COMPLETING_ALREADY_COMPLETING
+ /*
+ * FAST PATH -- no children to wait for && simple state (no list) && not failing => can complete immediately
+ * Failures always have to go through Finishing state to serialize exception handling.
+ * Otherwise, there can be a race between (completed state -> handled exception and newly attached child/join)
+ * which may miss unhandled exception.
+ */
+ if ((state is Empty || state is JobNode<*>) && state !is ChildHandleImpl && proposedUpdate !is CompletedExceptionally) {
+ if (!tryFinalizeSimpleState(state, proposedUpdate, mode)) return COMPLETING_RETRY
+ return COMPLETING_COMPLETED
+ }
+ // get state's list or else promote to list to correctly operate on child lists
+ val list = getOrPromoteFailingList(state) ?: return COMPLETING_RETRY
+ // promote to Finishing state if we are not in it yet
+ // This promotion has to be atomic w.r.t to state change, so that a coroutine that is not active yet
+ // atomically transition to finishing & completing state
+ val finishing = state as? Finishing ?: Finishing(list, false, null)
+ // must synchronize updates to finishing state
+ var notifyRootCause: Throwable? = null
+ synchronized(finishing) {
+ // check if this state is already completing
+ if (finishing.isCompleting) return COMPLETING_ALREADY_COMPLETING
+ // mark as completing
+ finishing.isCompleting = true
+ // if we need to promote to finishing then atomically do it here.
+ // We do it as early is possible while still holding the lock. This ensures that we cancelImpl asap
+ // (if somebody else is faster) and we synchronize all the threads on this finishing lock asap.
+ if (finishing !== state) {
+ if (!_state.compareAndSet(state, finishing)) return COMPLETING_RETRY
+ }
+ // ## IMPORTANT INVARIANT: Only one thread (that had set isCompleting) can go past this point
+ require(!finishing.isSealed) // cannot be sealed
+ // add new proposed exception to the finishing state
+ val wasCancelling = finishing.isCancelling
+ (proposedUpdate as? CompletedExceptionally)?.let { finishing.addExceptionLocked(it.cause) }
+ // If it just becomes failing --> must process failing notifications
+ notifyRootCause = finishing.rootCause.takeIf { !wasCancelling }
+ }
+ // process failing notification here -- it cancels all the children _before_ we start to to wait them (sic!!!)
+ notifyRootCause?.let { notifyCancelling(list, it) }
+ // now wait for children
+ val child = firstChild(state)
+ if (child != null && tryWaitForChild(finishing, child, proposedUpdate))
+ return COMPLETING_WAITING_CHILDREN
+ // otherwise -- we have not children left (all were already cancelled?)
+ if (tryFinalizeFinishingState(finishing, proposedUpdate, mode))
+ return COMPLETING_COMPLETED
+ // otherwise retry
+ return COMPLETING_RETRY
}
private val Any?.exceptionOrNull: Throwable?
get() = (this as? CompletedExceptionally)?.cause
private fun firstChild(state: Incomplete) =
- state as? ChildJob ?: state.list?.nextChild()
+ state as? ChildHandleImpl ?: state.list?.nextChild()
// return false when there is no more incomplete children to wait
- private tailrec fun tryWaitForChild(child: ChildJob, proposedUpdate: Any?): Boolean {
- val handle = child.childJob.invokeOnCompletion(invokeImmediately = false,
- handler = ChildCompletion(this, child, proposedUpdate).asHandler)
+ // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
+ private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleImpl, proposedUpdate: Any?): Boolean {
+ val handle = child.childJob.invokeOnCompletion(
+ invokeImmediately = false,
+ handler = ChildCompletion(this, state, child, proposedUpdate).asHandler
+ )
if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
val nextChild = child.nextChild() ?: return false
- return tryWaitForChild(nextChild, proposedUpdate)
+ return tryWaitForChild(state, nextChild, proposedUpdate)
}
- /**
- * @suppress **This is unstable API and it is subject to change.**
- */
- internal fun continueCompleting(lastChild: ChildJob, proposedUpdate: Any?) {
- loopOnState { state ->
- if (state !is Finishing)
- throw IllegalStateException("Job $this is found in expected state while completing with $proposedUpdate", proposedUpdate.exceptionOrNull)
- // figure out if we need to wait for next child
- val waitChild = lastChild.nextChild()
- // try wait for next child
- if (waitChild != null && tryWaitForChild(waitChild, proposedUpdate)) return // waiting for next child
- // no more children to wait -- try update state
- if (tryFinalizeState(state, proposedUpdate, MODE_ATOMIC_DEFAULT)) return
- }
+ // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
+ private fun continueCompleting(state: Finishing, lastChild: ChildHandleImpl, proposedUpdate: Any?) {
+ require(this.state === state) // consistency check -- it cannot change while we are waiting for children
+ // figure out if we need to wait for next child
+ val waitChild = lastChild.nextChild()
+ // try wait for next child
+ if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child
+ // no more children to wait -- try update state
+ if (tryFinalizeFinishingState(state, proposedUpdate, MODE_ATOMIC_DEFAULT)) return
}
- private fun LockFreeLinkedListNode.nextChild(): ChildJob? {
+ private fun LockFreeLinkedListNode.nextChild(): ChildHandleImpl? {
var cur = this
while (cur.isRemoved) cur = cur.prevNode // rollback to prev non-removed (or list head)
while (true) {
cur = cur.nextNode
if (cur.isRemoved) continue
- if (cur is ChildJob) return cur
+ if (cur is ChildHandleImpl) return cur
if (cur is NodeList) return null // checked all -- no more children
}
}
@@ -766,16 +856,26 @@
public final override val children: Sequence<Job> get() = buildSequence {
val state = this@JobSupport.state
when (state) {
- is ChildJob -> yield(state.childJob)
+ is ChildHandleImpl -> yield(state.childJob)
is Incomplete -> state.list?.let { list ->
- list.forEach<ChildJob> { yield(it.childJob) }
+ list.forEach<ChildHandleImpl> { yield(it.childJob) }
}
}
}
@Suppress("OverridingDeprecatedMember")
- public final override fun attachChild(child: Job): DisposableHandle =
- invokeOnCompletion(onCancelling = true, handler = ChildJob(this, child).asHandler)
+ public final override fun attachChild(child: ChildJob): ChildHandle {
+ /*
+ * Note: This function attaches a special ChildNode object. This node object
+ * is handled in a special way on completion on the coroutine (we wait for all of them) and
+ * is handled specially by invokeOnCompletion itself -- it adds this node to the list even
+ * if the job is already failing. For "failing" state child is attached under state lock.
+ * It's required to properly wait all children before completion and provide linearizable hierarchy view:
+ * If child is attached when job is failing, such child will receive immediate notification on failure,
+ * but parent *will* wait for that child before completion and will handle its exception.
+ */
+ return invokeOnCompletion(onCancelling = true, handler = ChildHandleImpl(this, child).asHandler) as ChildHandle
+ }
@Suppress("OverridingDeprecatedMember")
public final override fun cancelChildren(cause: Throwable?) {
@@ -792,40 +892,51 @@
}
/**
- * This function is invoked once when job is cancelled or is completed.
- * It's an optimization for [invokeOnCompletion] with `onCancelling` set to `true`.
+ * This function is invoked once when job is failing or is completed.
+ * It's an optimization for [invokeOnCompletion] with `onCancellation` set to `true`.
*
- * @param exceptionally not null when the the job was cancelled or completed exceptionally,
- * null when it has completed normally.
* @suppress **This is unstable API and it is subject to change.*
*/
- internal open fun onCancellationInternal(exceptionally: CompletedExceptionally?) {
- // TODO rename to "onCancelling"
+ protected open fun onCancellation(cause: Throwable?) {}
+
+ /**
+ * When this function returns `true` the parent fails on the failure of this job.
+ *
+ * @suppress **This is unstable API and it is subject to change.*
+ */
+ protected open val cancelsParent: Boolean get() = false
+
+ /**
+ * Returns `true` for jobs that handle their exceptions via [handleJobException] or integrate them
+ * into the job's result via [onCompletionInternal]. The only instance of the [Job] that does not
+ * handle its exceptions is [JobImpl].
+ *
+ * @suppress **This is unstable API and it is subject to change.*
+ */
+ protected open val handlesException: Boolean get() = true
+
+ /**
+ * This method is invoked **exactly once** when the final exception of the job is determined
+ * and before it becomes complete. At the moment of invocation the job and all its children are complete.
+ *
+ * @suppress **This is unstable API and it is subject to change.*
+ */
+ protected open fun handleJobException(exception: Throwable) {}
+
+ private fun cancelParent(cause: Throwable): Boolean {
+ if (cause is CancellationException) return true
+ if (!cancelsParent) return false
+ return parentHandle?.childCancelled(cause) == true
}
/**
- * Whether job has [onFinishingInternal] handler for given [update]
- * @suppress **This is unstable API and it is subject to change.**
- */
- internal open fun hasOnFinishingHandler(update: Any?) = false
-
- /**
- * @suppress **This is unstable API and it is subject to change.**
- */
- internal open fun onFinishingInternal(update: Any?) {}
-
- /**
- * Method which is invoked once Job becomes [Cancelled] or [CompletedExceptionally].
- * It's guaranteed that at the moment of invocation the job and all its children are complete.
- */
- internal open fun handleJobException(exception: Throwable) {}
-
- /**
* Override for post-completion actions that need to do something with the state.
+ * @param state the final state.
* @param mode completion mode.
+ * @param suppressed true when any exceptions were suppressed while building the final completion cause.
* @suppress **This is unstable API and it is subject to change.**
*/
- internal open fun onCompletionInternal(state: Any?, mode: Int) {}
+ internal open fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {}
// for nicer debugging
public override fun toString(): String =
@@ -840,89 +951,99 @@
val state = this.state
return when (state) {
is Finishing -> buildString {
- if (state.cancelled != null) append("Cancelling")
- if (state.completing) append("Completing")
+ when {
+ state.isCancelling -> append("Cancelling")
+ else -> append("Active")
+ }
+ if (state.isCompleting) append("Completing")
}
is Incomplete -> if (state.isActive) "Active" else "New"
- is Cancelled -> "Cancelled"
- is CompletedExceptionally -> "CompletedExceptionally"
+ is CompletedExceptionally -> "Cancelled"
else -> "Completed"
}
}
- // Cancelling or Completing
+ // Completing, Failing, Cancelling states,
+ // All updates are guarded by synchronized(this), reads are volatile
@Suppress("UNCHECKED_CAST")
private class Finishing(
override val list: NodeList,
- @JvmField val cancelled: Cancelled?, /* != null when cancelling */
- @JvmField val completing: Boolean /* true when completing */
+ @Volatile
+ @JvmField var isCompleting: Boolean,
+ @Volatile
+ @JvmField var rootCause: Throwable? // NOTE: rootCause is kept even when SEALED
) : SynchronizedObject(), Incomplete {
- override val isActive: Boolean get() = cancelled == null
+ @Volatile
+ private var _exceptionsHolder: Any? = null // Contains null | Throwable | ArrayList | SEALED
+ // NotE: cannot be modified when sealed
+ val isSealed: Boolean get() = _exceptionsHolder === SEALED
+ val isCancelling: Boolean get() = rootCause != null
+ override val isActive: Boolean get() = rootCause == null // !isCancelling
+
+ // Seals current state and returns list of exceptions
// guarded by `synchronized(this)`
- val exceptions: List<Throwable> get() = when(_exceptionsHolder) {
- NOT_INITIALIZED -> emptyList()
- is Throwable -> listOf(_exceptionsHolder as Throwable) // EA should handle this
- else -> (_exceptionsHolder as List<Throwable>)
+ fun sealLocked(proposedException: Throwable?): List<Throwable> {
+ val eh = _exceptionsHolder // volatile read
+ val list = when(eh) {
+ null -> allocateList()
+ is Throwable -> allocateList().also { it.add(eh) }
+ is ArrayList<*> -> eh as ArrayList<Throwable>
+ else -> error("State is $eh") // already sealed -- cannot happen
+ }
+ val rootCause = this.rootCause // volatile read
+ rootCause?.let { list.add(0, it) } // note -- rootCause goes to the beginning
+ if (proposedException != null && proposedException != rootCause) list.add(proposedException)
+ _exceptionsHolder = SEALED
+ return list
}
// guarded by `synchronized(this)`
- // Contains null | NOT_INITIALIZED | Throwable | ArrayList
- private var _exceptionsHolder: Any? = if (cancelled == null) null else NOT_INITIALIZED
-
- fun addException(exception: Throwable): Boolean =
- synchronized(this) {
- addExceptionLocked(exception)
+ fun addExceptionLocked(exception: Throwable) {
+ val rootCause = this.rootCause // volatile read
+ if (rootCause == null) {
+ this.rootCause = exception
+ return
}
-
- // guarded by `synchronized(this)`
- fun addExceptionLocked(exception: Throwable): Boolean =
- when (_exceptionsHolder) {
- null -> false
- NOT_INITIALIZED -> {
- _exceptionsHolder = exception
- true
- }
+ if (exception === rootCause) return // nothing to do
+ val eh = _exceptionsHolder // volatile read
+ when (eh) {
+ null -> _exceptionsHolder = exception
is Throwable -> {
- val previous = _exceptionsHolder
- _exceptionsHolder = ArrayList<Any?>(4).apply {
- add(previous)
+ if (exception === eh) return // nothing to do
+ _exceptionsHolder = allocateList().apply {
+ add(eh)
add(exception)
}
- true
}
- else -> (_exceptionsHolder as ArrayList<Throwable>).add(exception)
- }
-
- /**
- * Seals current state. After [seal] call all consecutive calls to [addException]
- * return `false` forcing callers to handle pending exception by themselves.
- * This call should be guarded by `synchronized(this)`
- */
- fun seal() {
- _exceptionsHolder = null
- }
-
- fun transferExceptions(to: Finishing) {
- synchronized(this) {
- synchronized(to) {
- val holder = _exceptionsHolder
- when(holder) {
- // Note: "to" cannot be sealed at this point and adding exception there mush succeed.
- is Throwable -> require(to.addExceptionLocked(holder))
- is List<*> -> holder.forEach {
- require(to.addExceptionLocked(it as Throwable))
- }
- }
- seal()
- }
+ is ArrayList<*> -> (eh as ArrayList<Throwable>).add(exception)
+ else -> error("State is $eh") // already sealed -- cannot happen
}
}
+
+ private fun allocateList() = ArrayList<Throwable>(4)
+
+ override fun toString(): String =
+ "Finishing[cancelling=$isCancelling, completing=$isCompleting, rootCause=$rootCause, exceptions=$_exceptionsHolder, list=$list]"
}
private val Incomplete.isCancelling: Boolean
- get() = this is Finishing && cancelled != null
+ get() = this is Finishing && isCancelling
+
+ // Used by parent that is waiting for child completion
+ private class ChildCompletion(
+ private val parent: JobSupport,
+ private val state: Finishing,
+ private val child: ChildHandleImpl,
+ private val proposedUpdate: Any?
+ ) : JobNode<Job>(child.childJob) {
+ override fun invoke(cause: Throwable?) {
+ parent.continueCompleting(state, child, proposedUpdate)
+ }
+ override fun toString(): String =
+ "ChildCompletion[$child, $proposedUpdate]"
+ }
/*
* =================================================================================================
@@ -1019,34 +1140,20 @@
else
block.startCoroutineCancellable(state as T, select.completion)
}
-
- class HandleExceptionOp(val original: CompletedExceptionally) : OpDescriptor() {
-
- override fun perform(affected: Any?): Any? {
- val job = (affected as JobSupport)
- if (job._state.compareAndSet(this, original)) {
- job.handleJobException(original.cause)
- }
-
- return null
- }
- }
}
// --------------- helper classes & constants for job implementation
-internal const val ON_CANCEL_MAKE_CANCELLING = 0
-internal const val ON_CANCEL_MAKE_COMPLETING = 1
-
private const val COMPLETING_ALREADY_COMPLETING = 0
private const val COMPLETING_COMPLETED = 1
private const val COMPLETING_WAITING_CHILDREN = 2
+private const val COMPLETING_RETRY = 3
private const val RETRY = -1
private const val FALSE = 0
private const val TRUE = 1
-private val NOT_INITIALIZED = Symbol("NOT_INITIALIZED")
+private val SEALED = Symbol("SEALED")
private val EMPTY_NEW = Empty(false)
private val EMPTY_ACTIVE = Empty(true)
@@ -1058,13 +1165,8 @@
internal class JobImpl(parent: Job? = null) : JobSupport(true) {
init { initParentJobInternal(parent) }
- override val onCancelMode: Int get() = ON_CANCEL_MAKE_COMPLETING
-
- override fun cancel(cause: Throwable?): Boolean {
- // JobImpl can't handle an exception, thus always returns false
- super.cancel(cause)
- return false
- }
+ override val onCancelComplete get() = true
+ override val handlesException: Boolean get() = false
}
// -------- invokeOnCompletion nodes
@@ -1159,51 +1261,41 @@
// -------- invokeOnCancellation nodes
/**
- * Marker for node that shall be invoked on cancellation (in _cancelling_ state).
- * **Note: may be invoked multiple times during cancellation.**
+ * Marker for node that shall be invoked on in _failing_ state.
+ * **Note: may be invoked multiple times.**
*/
-internal abstract class JobCancellationNode<out J : Job>(job: J) : JobNode<J>(job)
+internal abstract class JobCancellingNode<out J : Job>(job: J) : JobNode<J>(job)
-private class InvokeOnCancellation(
+private class InvokeOnCancelling(
job: Job,
private val handler: CompletionHandler
-) : JobCancellationNode<Job>(job) {
+) : JobCancellingNode<Job>(job) {
// delegate handler shall be invoked at most once, so here is an additional flag
- private val _invoked = atomic(0)
+ private val _invoked = atomic(0) // todo: replace with atomic boolean after migration to recent atomicFu
override fun invoke(cause: Throwable?) {
if (_invoked.compareAndSet(0, 1)) handler.invoke(cause)
}
- override fun toString() = "InvokeOnCancellation[$classSimpleName@$hexAddress]"
+ override fun toString() = "InvokeOnCancelling[$classSimpleName@$hexAddress]"
}
-internal class ChildJob(
+internal class ChildHandleImpl(
parent: JobSupport,
- @JvmField val childJob: Job
-) : JobCancellationNode<JobSupport>(parent) {
- override fun invoke(cause: Throwable?) {
- // Always materialize the actual instance of parent's completion exception and cancel child with it
- childJob.cancel(job.getCancellationException())
- }
- override fun toString(): String = "ChildJob[$childJob]"
+ @JvmField val childJob: ChildJob
+) : JobCancellingNode<JobSupport>(parent), ChildHandle {
+ override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
+ override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
+ override fun toString(): String = "ChildHandle[$childJob]"
}
-// Same as ChildJob, but for cancellable continuation
+// Same as ChildHandleImpl, but for cancellable continuation
internal class ChildContinuation(
parent: Job,
@JvmField val child: AbstractContinuation<*>
-) : JobCancellationNode<Job>(parent) {
+) : JobCancellingNode<Job>(parent) {
override fun invoke(cause: Throwable?) {
child.cancel(job.getCancellationException())
}
- override fun toString(): String = "ChildContinuation[$child]"
+ override fun toString(): String =
+ "ChildContinuation[$child]"
}
-private class ChildCompletion(
- private val parent: JobSupport,
- private val child: ChildJob,
- private val proposedUpdate: Any?
-) : JobNode<Job>(child.childJob) {
- override fun invoke(cause: Throwable?) {
- parent.continueCompleting(child, proposedUpdate)
- }
-}
diff --git a/common/kotlinx-coroutines-core-common/src/NonCancellable.kt b/common/kotlinx-coroutines-core-common/src/NonCancellable.kt
index d87b929..1d60aa4 100644
--- a/common/kotlinx-coroutines-core-common/src/NonCancellable.kt
+++ b/common/kotlinx-coroutines-core-common/src/NonCancellable.kt
@@ -116,7 +116,7 @@
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
- override fun cancel(cause: Throwable?): Boolean = false
+ override fun cancel(cause: Throwable?): Boolean = false // never handles exceptions
/**
* Always returns [emptySequence].
@@ -127,12 +127,12 @@
get() = emptySequence()
/**
- * Always returns no-op handle and does not do anything.
+ * Always returns [NonDisposableHandle] and does not do anything.
* @suppress **This an internal API and should not be used from general code.**
*/
- @Suppress("OverridingDeprecatedMember")
+ @Suppress("EXPOSED_FUNCTION_RETURN_TYPE", "EXPOSED_PARAMETER_TYPE")
@InternalCoroutinesApi
- override fun attachChild(child: Job): DisposableHandle = NonDisposableHandle
+ override fun attachChild(child: ChildJob): ChildHandle = NonDisposableHandle
/**
* Does not do anything.
diff --git a/common/kotlinx-coroutines-core-common/src/Scheduled.kt b/common/kotlinx-coroutines-core-common/src/Scheduled.kt
index 251696e..1545843 100644
--- a/common/kotlinx-coroutines-core-common/src/Scheduled.kt
+++ b/common/kotlinx-coroutines-core-common/src/Scheduled.kt
@@ -36,8 +36,25 @@
replaceWith = ReplaceWith("withTimeout(unit.toMillis(time), block)")
)
public suspend fun <T> withTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend CoroutineScope.() -> T): T =
- withTimeout(time.convertToMillis(unit), block)
+ withTimeout(unit.toMillis(time), block)
+/**
+ * Runs a given suspending block of code inside a coroutine with a specified timeout and returns
+ * `null` if this timeout was exceeded.
+ *
+ * The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
+ * cancellable suspending function inside the block throws [TimeoutCancellationException].
+ * **NB**: this method is exception-unsafe. Even if the code in the block suppresses [TimeoutCancellationException], this
+ * invocation of `withTimeoutOrNull` still returns `null` and thrown exception will be ignored.
+ *
+ * The sibling function that throws exception on timeout is [withTimeout].
+ * Note, that timeout action can be specified for [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
+ *
+ * This function delegates to [Delay.invokeOnTimeout] if the context [CoroutineDispatcher]
+ * implements [Delay] interface, otherwise it tracks time using a built-in single-threaded scheduled executor service.
+ *
+ * @param time timeout time in milliseconds.
+ */
@Deprecated(level = DeprecationLevel.HIDDEN, message = "binary compatibility")
public suspend fun <T> withTimeoutOrNull(time: Int, block: suspend CoroutineScope.() -> T): T? =
withTimeoutOrNull(time.toLong(), TimeUnit.MILLISECONDS, block)
diff --git a/common/kotlinx-coroutines-core-common/src/Timeout.kt b/common/kotlinx-coroutines-core-common/src/Timeout.kt
index 0403d83..5c9ea07 100644
--- a/common/kotlinx-coroutines-core-common/src/Timeout.kt
+++ b/common/kotlinx-coroutines-core-common/src/Timeout.kt
@@ -96,7 +96,7 @@
}
@Suppress("UNCHECKED_CAST")
- internal override fun onCompletionInternal(state: Any?, mode: Int) {
+ internal override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
if (state is CompletedExceptionally)
uCont.resumeUninterceptedWithExceptionMode(state.cause, mode)
else
diff --git a/common/kotlinx-coroutines-core-common/src/channels/Broadcast.kt b/common/kotlinx-coroutines-core-common/src/channels/Broadcast.kt
index 96d71e0..3da7fd6 100644
--- a/common/kotlinx-coroutines-core-common/src/channels/Broadcast.kt
+++ b/common/kotlinx-coroutines-core-common/src/channels/Broadcast.kt
@@ -111,17 +111,16 @@
override val channel: SendChannel<E>
get() = this
- override fun cancel(): Boolean = super.cancel()
- public override fun cancel(cause: Throwable?): Boolean = super.cancel(cause)
+ override fun cancel(cause: Throwable?): Boolean {
+ val wasCancelled = _channel.cancel(cause)
+ if (wasCancelled) super.cancel(cause) // cancel the job
+ return wasCancelled
+ }
- override fun onCancellationInternal(exceptionally: CompletedExceptionally?) {
- val cause = exceptionally?.cause
- val processed = when (exceptionally) {
- is Cancelled -> _channel.cancel(cause) // producer coroutine was cancelled -- cancel channel
- else -> _channel.close(cause) // producer coroutine has completed -- close channel
- }
- if (!processed && cause != null)
- handleCoroutineException(context, cause, this)
+ override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
+ val cause = (state as? CompletedExceptionally)?.cause
+ val processed = _channel.close(cause)
+ if (cause != null && !processed && suppressed) handleExceptionViaHandler(context, cause)
}
}
diff --git a/common/kotlinx-coroutines-core-common/src/channels/ChannelCoroutine.kt b/common/kotlinx-coroutines-core-common/src/channels/ChannelCoroutine.kt
index 1e3c4d0..dfb5112 100644
--- a/common/kotlinx-coroutines-core-common/src/channels/ChannelCoroutine.kt
+++ b/common/kotlinx-coroutines-core-common/src/channels/ChannelCoroutine.kt
@@ -14,6 +14,11 @@
) : AbstractCoroutine<Unit>(parentContext, active), Channel<E> by _channel {
val channel: Channel<E> get() = this
- override fun cancel(): Boolean = super.cancel()
- override fun cancel(cause: Throwable?): Boolean = super.cancel(cause)
+ override fun cancel() = cancel(null)
+
+ override fun cancel(cause: Throwable?): Boolean {
+ val wasCancelled = _channel.cancel(cause)
+ if (wasCancelled) super.cancel(cause) // cancel the job
+ return wasCancelled
+ }
}
diff --git a/common/kotlinx-coroutines-core-common/src/channels/Produce.kt b/common/kotlinx-coroutines-core-common/src/channels/Produce.kt
index ba93b00..b7e75de 100644
--- a/common/kotlinx-coroutines-core-common/src/channels/Produce.kt
+++ b/common/kotlinx-coroutines-core-common/src/channels/Produce.kt
@@ -168,18 +168,12 @@
private class ProducerCoroutine<E>(
parentContext: CoroutineContext, channel: Channel<E>
) : ChannelCoroutine<E>(parentContext, channel, active = true), ProducerScope<E>, ProducerJob<E> {
-
override val isActive: Boolean
get() = super<ChannelCoroutine>.isActive
- override fun onCancellationInternal(exceptionally: CompletedExceptionally?) {
- val cause = exceptionally?.cause
- val processed = when (exceptionally) {
- // producer coroutine was cancelled -- cancel channel, but without cause if it was closed without cause
- is Cancelled -> _channel.cancel(if (cause is CancellationException) null else cause)
- else -> _channel.close(cause) // producer coroutine has completed -- close channel
- }
- if (!processed && cause != null)
- handleCoroutineException(context, cause, this)
+ override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
+ val cause = (state as? CompletedExceptionally)?.cause
+ val processed = _channel.close(cause)
+ if (cause != null && !processed && suppressed) handleExceptionViaHandler(context, cause)
}
}
diff --git a/common/kotlinx-coroutines-core-common/src/internal/Concurrent.common.kt b/common/kotlinx-coroutines-core-common/src/internal/Concurrent.common.kt
index 682fbbe..be4b57c 100644
--- a/common/kotlinx-coroutines-core-common/src/internal/Concurrent.common.kt
+++ b/common/kotlinx-coroutines-core-common/src/internal/Concurrent.common.kt
@@ -20,3 +20,5 @@
}
internal expect inline fun <T> ReentrantLock.withLock(action: () -> T): T
+
+internal expect fun <E> identitySet(expectedSize: Int): MutableSet<E>
\ No newline at end of file
diff --git a/common/kotlinx-coroutines-core-common/src/internal/Scopes.kt b/common/kotlinx-coroutines-core-common/src/internal/Scopes.kt
index e2e7475..d087bc8 100644
--- a/common/kotlinx-coroutines-core-common/src/internal/Scopes.kt
+++ b/common/kotlinx-coroutines-core-common/src/internal/Scopes.kt
@@ -7,20 +7,13 @@
import kotlinx.coroutines.experimental.*
import kotlin.coroutines.experimental.*
-internal class ScopeOwnerCoroutine<R>(
+/**
+ * This is a coroutine instance that is created by [coroutineScope] builder.
+ * It is just a vanilla instance of [AbstractCoroutine].
+ */
+internal class ScopeCoroutine<R>(
parentContext: CoroutineContext
-) : AbstractCoroutine<R>(parentContext, true), CoroutineScope {
-
- override val coroutineContext: CoroutineContext = parentContext + this
-
- /*
- * Always return true, so final exception is in the scope before its completion.
- */
- override fun cancel(cause: Throwable?): Boolean {
- super.cancel(cause)
- return true
- }
-}
+) : AbstractCoroutine<R>(parentContext, true)
internal class ContextScope(context: CoroutineContext) : CoroutineScope {
override val coroutineContext: CoroutineContext = context
diff --git a/common/kotlinx-coroutines-core-common/src/selects/Select.kt b/common/kotlinx-coroutines-core-common/src/selects/Select.kt
index ee9fe73..bbec635 100644
--- a/common/kotlinx-coroutines-core-common/src/selects/Select.kt
+++ b/common/kotlinx-coroutines-core-common/src/selects/Select.kt
@@ -289,19 +289,19 @@
private fun initCancellability() {
val parent = context[Job] ?: return
val newRegistration = parent.invokeOnCompletion(
- onCancelling = true, handler = SelectOnCancellation(parent).asHandler)
+ onCancelling = true, handler = SelectOnCancelling(parent).asHandler)
parentHandle = newRegistration
// now check our state _after_ registering
if (isSelected) newRegistration.dispose()
}
- private inner class SelectOnCancellation(job: Job) : JobCancellationNode<Job>(job) {
+ private inner class SelectOnCancelling(job: Job) : JobCancellingNode<Job>(job) {
// Note: may be invoked multiple times, but only the first trySelect succeeds anyway
override fun invoke(cause: Throwable?) {
if (trySelect(null))
resumeSelectCancellableWithException(job.getCancellationException())
}
- override fun toString(): String = "SelectOnCancellation[${this@SelectBuilderImpl}]"
+ override fun toString(): String = "SelectOnCancelling[${this@SelectBuilderImpl}]"
}
private val state: Any? get() {
diff --git a/common/kotlinx-coroutines-core-common/test/AbstractCoroutineTest.kt b/common/kotlinx-coroutines-core-common/test/AbstractCoroutineTest.kt
index 80a5705..06e45f5 100644
--- a/common/kotlinx-coroutines-core-common/test/AbstractCoroutineTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/AbstractCoroutineTest.kt
@@ -4,7 +4,6 @@
package kotlinx.coroutines.experimental
-import kotlin.coroutines.experimental.*
import kotlin.test.*
class AbstractCoroutineTest : TestBase() {
diff --git a/common/kotlinx-coroutines-core-common/test/AsyncLazyTest.kt b/common/kotlinx-coroutines-core-common/test/AsyncLazyTest.kt
index 50ce50d..83be9e6 100644
--- a/common/kotlinx-coroutines-core-common/test/AsyncLazyTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/AsyncLazyTest.kt
@@ -20,7 +20,7 @@
expect(2)
assertTrue(!d.isActive && !d.isCompleted)
assertEquals(d.await(), 42)
- assertTrue(!d.isActive && d.isCompleted && !d.isCompletedExceptionally)
+ assertTrue(!d.isActive && d.isCompleted && !d.isCancelled)
expect(4)
assertEquals(d.await(), 42) // second await -- same result
finish(5)
@@ -38,7 +38,7 @@
expect(2)
assertTrue(!d.isActive && !d.isCompleted)
assertEquals(d.await(), 42)
- assertTrue(!d.isActive && d.isCompleted && !d.isCompletedExceptionally)
+ assertTrue(!d.isActive && d.isCompleted && !d.isCancelled)
expect(5)
assertEquals(d.await(), 42) // second await -- same result
finish(6)
@@ -67,7 +67,7 @@
expect(5)
assertTrue(!d.isActive && !d.isCompleted)
assertEquals(d.await(), 42) // starts computing
- assertTrue(!d.isActive && d.isCompleted && !d.isCompletedExceptionally)
+ assertTrue(!d.isActive && d.isCompleted && !d.isCancelled)
finish(8)
}
@@ -113,7 +113,7 @@
try {
d.await() // will throw IOException
} catch (e: TestException) {
- assertTrue(!d.isActive && d.isCompleted && d.isCompletedExceptionally && !d.isCancelled)
+ assertTrue(!d.isActive && d.isCompleted && d.isCancelled)
expect(4)
}
finish(5)
@@ -133,7 +133,7 @@
expect(3)
assertTrue(!d.start())
yield() // yield to started coroutine
- assertTrue(!d.isActive && d.isCompleted && !d.isCompletedExceptionally) // and it finishes
+ assertTrue(!d.isActive && d.isCompleted && !d.isCancelled) // and it finishes
expect(5)
assertEquals(d.await(), 42) // await sees result
finish(6)
@@ -151,7 +151,7 @@
expect(2)
assertTrue(!d.isActive && !d.isCompleted)
assertTrue(d.cancel())
- assertTrue(!d.isActive && d.isCompleted && d.isCompletedExceptionally && d.isCancelled)
+ assertTrue(!d.isActive && d.isCompleted && d.isCancelled)
assertTrue(!d.cancel())
assertTrue(!d.start())
finish(3)
@@ -179,9 +179,9 @@
expect(5)
assertTrue(d.isActive && !d.isCompleted && !d.isCancelled)
assertTrue(d.cancel())
- assertTrue(!d.isActive && !d.isCompletedExceptionally && d.isCancelled) // cancelling !
+ assertTrue(!d.isActive && d.isCancelled) // cancelling !
assertTrue(d.cancel())
- assertTrue(!d.isActive && !d.isCompletedExceptionally && d.isCancelled) // still cancelling
+ assertTrue(!d.isActive && d.isCancelled) // still cancelling
finish(6)
assertEquals(d.await(), 42) // await shall throw CancellationException
expectUnreached()
diff --git a/common/kotlinx-coroutines-core-common/test/AwaitTest.kt b/common/kotlinx-coroutines-core-common/test/AwaitTest.kt
index 71a3823..8a56fb1 100644
--- a/common/kotlinx-coroutines-core-common/test/AwaitTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/AwaitTest.kt
@@ -86,7 +86,7 @@
}
yield()
- require(d.isCompleted && d2.isCompletedExceptionally && d3.isActive)
+ require(d.isCompleted && d2.isCancelled && d3.isActive)
d3.cancel()
finish(6)
}
@@ -203,9 +203,9 @@
@Test
fun testAwaitAllFullyCompletedExceptionally() = runTest {
val d1 = CompletableDeferred<Unit>(parent = null)
- .apply { completeExceptionally(TestException()) }
+ .apply { cancel(TestException()) }
val d2 = CompletableDeferred<Unit>(parent = null)
- .apply { completeExceptionally(TestException()) }
+ .apply { cancel(TestException()) }
val job = async { expect(3) }
expect(1)
try {
diff --git a/common/kotlinx-coroutines-core-common/test/CompletableDeferredTest.kt b/common/kotlinx-coroutines-core-common/test/CompletableDeferredTest.kt
index 4b78b30..f9a71b8 100644
--- a/common/kotlinx-coroutines-core-common/test/CompletableDeferredTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/CompletableDeferredTest.kt
@@ -19,7 +19,6 @@
assertEquals(true, c.isActive)
assertEquals(false, c.isCancelled)
assertEquals(false, c.isCompleted)
- assertEquals(false, c.isCompletedExceptionally)
assertThrows<IllegalStateException> { c.getCancellationException() }
assertThrows<IllegalStateException> { c.getCompleted() }
assertThrows<IllegalStateException> { c.getCompletionExceptionOrNull() }
@@ -38,32 +37,12 @@
assertEquals(false, c.isActive)
assertEquals(false, c.isCancelled)
assertEquals(true, c.isCompleted)
- assertEquals(false, c.isCompletedExceptionally)
assertTrue(c.getCancellationException() is JobCancellationException)
assertEquals("OK", c.getCompleted())
assertEquals(null, c.getCompletionExceptionOrNull())
}
@Test
- fun testCompleteWithException() {
- val c = CompletableDeferred<String>()
- assertEquals(true, c.completeExceptionally(TestException()))
- checkCompleteTestException(c)
- assertEquals(false, c.completeExceptionally(TestException()))
- checkCompleteTestException(c)
- }
-
- private fun checkCompleteTestException(c: CompletableDeferred<String>) {
- assertEquals(false, c.isActive)
- assertEquals(false, c.isCancelled)
- assertEquals(true, c.isCompleted)
- assertEquals(true, c.isCompletedExceptionally)
- assertTrue(c.getCancellationException() is JobCancellationException)
- assertThrows<TestException> { c.getCompleted() }
- assertTrue(c.getCompletionExceptionOrNull() is TestException)
- }
-
- @Test
fun testCancel() {
val c = CompletableDeferred<String>()
assertEquals(true, c.cancel())
@@ -76,7 +55,6 @@
assertEquals(false, c.isActive)
assertEquals(true, c.isCancelled)
assertEquals(true, c.isCompleted)
- assertEquals(true, c.isCompletedExceptionally)
assertThrows<CancellationException> { c.getCompleted() }
assertTrue(c.getCompletionExceptionOrNull() is CancellationException)
}
@@ -94,7 +72,6 @@
assertEquals(false, c.isActive)
assertEquals(true, c.isCancelled)
assertEquals(true, c.isCompleted)
- assertEquals(true, c.isCompletedExceptionally)
assertTrue(c.getCancellationException() is JobCancellationException)
assertThrows<TestException> { c.getCompleted() }
assertTrue(c.getCompletionExceptionOrNull() is TestException)
@@ -108,7 +85,11 @@
parent.cancel()
assertEquals(false, parent.isActive)
assertEquals(true, parent.isCancelled)
- checkCancel(c)
+ assertEquals(false, c.isActive)
+ assertEquals(true, c.isCancelled)
+ assertEquals(true, c.isCompleted)
+ assertThrows<CancellationException> { c.getCompleted() }
+ assertTrue(c.getCompletionExceptionOrNull() is CancellationException)
}
@Test
@@ -128,8 +109,8 @@
val c = CompletableDeferred<String>(parent)
checkFresh(c)
assertEquals(true, parent.isActive)
- assertEquals(true, c.completeExceptionally(TestException()))
- checkCompleteTestException(c)
+ assertEquals(true, c.cancel(TestException()))
+ checkCancelWithException(c)
assertEquals(true, parent.isActive)
}
diff --git a/common/kotlinx-coroutines-core-common/test/CoroutineExceptionHandlerTest.kt b/common/kotlinx-coroutines-core-common/test/CoroutineExceptionHandlerTest.kt
index 366f1e4..80ddd46 100644
--- a/common/kotlinx-coroutines-core-common/test/CoroutineExceptionHandlerTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/CoroutineExceptionHandlerTest.kt
@@ -7,23 +7,42 @@
import kotlin.test.*
class CoroutineExceptionHandlerTest : TestBase() {
+ // Parent Job() does not handle exception --> handler is invoked on child crash
@Test
- fun testCoroutineExceptionHandlerCreator() = runTest {
+ fun testJob() = runTest {
expect(1)
var coroutineException: Throwable? = null
val handler = CoroutineExceptionHandler { _, ex ->
coroutineException = ex
expect(3)
}
-
- val job = launch(handler + Job()) {
+ val parent = Job()
+ val job = launch(handler + parent) {
throw TestException()
}
-
expect(2)
job.join()
finish(4)
assertTrue(coroutineException is TestException)
+ assertTrue(parent.isCancelled)
+ }
+
+ // Parent CompletableDeferred() "handles" exception --> handler is NOT invoked on child crash
+ @Test
+ fun testCompletableDeferred() = runTest {
+ expect(1)
+ val handler = CoroutineExceptionHandler { _, ex ->
+ expectUnreached()
+ }
+ val parent = CompletableDeferred<Unit>()
+ val job = launch(handler + parent) {
+ throw TestException()
+ }
+ expect(2)
+ job.join()
+ finish(3)
+ assertTrue(parent.isCancelled)
+ assertTrue(parent.getCompletionExceptionOrNull() is TestException)
}
private class TestException: RuntimeException()
diff --git a/common/kotlinx-coroutines-core-common/test/CoroutinesTest.kt b/common/kotlinx-coroutines-core-common/test/CoroutinesTest.kt
index 916186d..db59aa5 100644
--- a/common/kotlinx-coroutines-core-common/test/CoroutinesTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/CoroutinesTest.kt
@@ -293,7 +293,7 @@
expect(4)
throw TestException("Crashed")
}
- launch(parent, CoroutineStart.UNDISPATCHED) {
+ val child = launch(parent, CoroutineStart.UNDISPATCHED) {
expect(2)
try {
yield() // to test
@@ -309,6 +309,8 @@
expect(6)
parent.join() // make sure crashed parent still waits for its child
finish(8)
+ // make sure is cancelled
+ assertTrue(child.isCancelled)
}
@Test
diff --git a/common/kotlinx-coroutines-core-common/test/FailedJobTest.kt b/common/kotlinx-coroutines-core-common/test/FailedJobTest.kt
new file mode 100644
index 0000000..975e376
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/test/FailedJobTest.kt
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913
+
+package kotlinx.coroutines.experimental
+
+import kotlin.test.*
+
+// see https://github.com/Kotlin/kotlinx.coroutines/issues/585
+class FailedJobTest : TestBase() {
+ @Test
+ fun testCancelledJob() = runTest {
+ expect(1)
+ val job = launch {
+ expectUnreached()
+ }
+ expect(2)
+ job.cancelAndJoin()
+ finish(3)
+ assertTrue(job.isCompleted)
+ assertTrue(!job.isActive)
+ assertTrue(job.isCancelled)
+ }
+
+ @Test
+ fun testFailedJob() = runTest(
+ unhandled = listOf({it -> it is TestException })
+ ) {
+ expect(1)
+ val job = launch(NonCancellable) {
+ expect(3)
+ throw TestException()
+ }
+ expect(2)
+ job.join()
+ finish(4)
+ assertTrue(job.isCompleted)
+ assertTrue(!job.isActive)
+ assertTrue(job.isCancelled)
+ }
+
+ @Test
+ fun testFailedChildJob() = runTest(
+ unhandled = listOf({it -> it is TestException })
+ ) {
+ expect(1)
+ val job = launch(NonCancellable) {
+ expect(3)
+ launch {
+ throw TestException()
+ }
+ }
+ expect(2)
+ job.join()
+ finish(4)
+ assertTrue(job.isCompleted)
+ assertTrue(!job.isActive)
+ assertTrue(job.isCancelled)
+ }
+
+ private class TestException : Exception()
+}
\ No newline at end of file
diff --git a/common/kotlinx-coroutines-core-common/test/JobStatesTest.kt b/common/kotlinx-coroutines-core-common/test/JobStatesTest.kt
new file mode 100644
index 0000000..9306157
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/test/JobStatesTest.kt
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913
+
+package kotlinx.coroutines.experimental
+
+import kotlin.test.*
+
+/**
+ * Tests that the transitions to the state of the [Job] correspond to documentation in the
+ * table that is presented in the [Job] documentation.
+ */
+class JobStatesTest : TestBase() {
+ @Test
+ public fun testNormalCompletion() = runTest {
+ expect(1)
+ val job = launch(start = CoroutineStart.LAZY) {
+ expect(2)
+ // launches child
+ launch {
+ expect(4)
+ }
+ // completes normally
+ }
+ // New job
+ assertFalse(job.isActive)
+ assertFalse(job.isCompleted)
+ assertFalse(job.isCancelled)
+ // New -> Active
+ job.start()
+ assertTrue(job.isActive)
+ assertFalse(job.isCompleted)
+ assertFalse(job.isCancelled)
+ // Active -> Completing
+ yield() // scheduled & starts child
+ expect(3)
+ assertTrue(job.isActive)
+ assertFalse(job.isCompleted)
+ assertFalse(job.isCancelled)
+ // Completing -> Completed
+ yield()
+ finish(5)
+ assertFalse(job.isActive)
+ assertTrue(job.isCompleted)
+ assertFalse(job.isCancelled)
+ }
+
+ @Test
+ public fun testCompletingFailed() = runTest(
+ unhandled = listOf({ it -> it is TestException })
+ ) {
+ expect(1)
+ val job = launch(NonCancellable, start = CoroutineStart.LAZY) {
+ expect(2)
+ // launches child
+ launch {
+ expect(4)
+ throw TestException()
+ }
+ // completes normally
+ }
+ // New job
+ assertFalse(job.isActive)
+ assertFalse(job.isCompleted)
+ assertFalse(job.isCancelled)
+ // New -> Active
+ job.start()
+ assertTrue(job.isActive)
+ assertFalse(job.isCompleted)
+ assertFalse(job.isCancelled)
+ // Active -> Completing
+ yield() // scheduled & starts child
+ expect(3)
+ assertTrue(job.isActive)
+ assertFalse(job.isCompleted)
+ assertFalse(job.isCancelled)
+ // Completing -> Cancelled
+ yield()
+ finish(5)
+ assertFalse(job.isActive)
+ assertTrue(job.isCompleted)
+ assertTrue(job.isCancelled)
+ }
+
+ @Test
+ public fun testFailed() = runTest(
+ unhandled = listOf({ it -> it is TestException })
+ ) {
+ expect(1)
+ val job = launch(NonCancellable, start = CoroutineStart.LAZY) {
+ expect(2)
+ // launches child
+ launch(start = CoroutineStart.ATOMIC) {
+ expect(4)
+ }
+ // failing
+ throw TestException()
+ }
+ // New job
+ assertFalse(job.isActive)
+ assertFalse(job.isCompleted)
+ assertFalse(job.isCancelled)
+ // New -> Active
+ job.start()
+ assertTrue(job.isActive)
+ assertFalse(job.isCompleted)
+ assertFalse(job.isCancelled)
+ // Active -> Cancelling
+ yield() // scheduled & starts child
+ expect(3)
+ assertFalse(job.isActive)
+ assertFalse(job.isCompleted)
+ assertTrue(job.isCancelled)
+ // Cancelling -> Cancelled
+ yield()
+ finish(5)
+ assertFalse(job.isActive)
+ assertTrue(job.isCompleted)
+ assertTrue(job.isCancelled)
+ }
+
+ @Test
+ public fun testCancelling() = runTest {
+ expect(1)
+ val job = launch(NonCancellable, start = CoroutineStart.LAZY) {
+ expect(2)
+ // launches child
+ launch(start = CoroutineStart.ATOMIC) {
+ expect(4)
+ }
+ // completes normally
+ }
+ // New job
+ assertFalse(job.isActive)
+ assertFalse(job.isCompleted)
+ assertFalse(job.isCancelled)
+ // New -> Active
+ job.start()
+ assertTrue(job.isActive)
+ assertFalse(job.isCompleted)
+ assertFalse(job.isCancelled)
+ // Active -> Completing
+ yield() // scheduled & starts child
+ expect(3)
+ assertTrue(job.isActive)
+ assertFalse(job.isCompleted)
+ assertFalse(job.isCancelled)
+ // Completing -> Cancelling
+ job.cancel()
+ assertFalse(job.isActive)
+ assertFalse(job.isCompleted)
+ assertTrue(job.isCancelled)
+ // Cancelling -> Cancelled
+ yield()
+ finish(5)
+ assertFalse(job.isActive)
+ assertTrue(job.isCompleted)
+ assertTrue(job.isCancelled)
+ }
+
+ private class TestException : Exception()
+}
\ No newline at end of file
diff --git a/common/kotlinx-coroutines-core-common/test/channels/ProduceTest.kt b/common/kotlinx-coroutines-core-common/test/channels/ProduceTest.kt
index 5c455e4..9b7cc1d 100644
--- a/common/kotlinx-coroutines-core-common/test/channels/ProduceTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/channels/ProduceTest.kt
@@ -62,8 +62,7 @@
send(2) // will get cancelled
} catch (e: Exception) {
finish(6)
- check(e is JobCancellationException && e.job == coroutineContext[Job])
- check(e.cause is TestException)
+ check(e is TestException)
throw e
}
expectUnreached()
diff --git a/core/kotlinx-coroutines-core/src/Builders.kt b/core/kotlinx-coroutines-core/src/Builders.kt
index 8e2b9df..8c93f4c 100644
--- a/core/kotlinx-coroutines-core/src/Builders.kt
+++ b/core/kotlinx-coroutines-core/src/Builders.kt
@@ -55,7 +55,7 @@
if (privateEventLoop) require(eventLoop is BlockingEventLoop)
}
- override fun onCompletionInternal(state: Any?, mode: Int) {
+ override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
// wake up blocked thread
if (Thread.currentThread() != blockedThread)
LockSupport.unpark(blockedThread)
diff --git a/core/kotlinx-coroutines-core/src/CoroutineExceptionHandlerImpl.kt b/core/kotlinx-coroutines-core/src/CoroutineExceptionHandlerImpl.kt
index 836592b..d570f29 100644
--- a/core/kotlinx-coroutines-core/src/CoroutineExceptionHandlerImpl.kt
+++ b/core/kotlinx-coroutines-core/src/CoroutineExceptionHandlerImpl.kt
@@ -21,8 +21,15 @@
internal actual fun handleCoroutineExceptionImpl(context: CoroutineContext, exception: Throwable) {
// use additional extension handlers
for (handler in handlers) {
- handler.handleException(context, exception)
+ try {
+ handler.handleException(context, exception)
+ } catch (t: Throwable) {
+ // Use thread's handler if custom handler failed to handle exception
+ val currentThread = Thread.currentThread()
+ currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, handlerException(exception, t))
+ }
}
+
// use thread's handler
val currentThread = Thread.currentThread()
currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception)
diff --git a/core/kotlinx-coroutines-core/src/channels/Actor.kt b/core/kotlinx-coroutines-core/src/channels/Actor.kt
index ed64588..abdc340 100644
--- a/core/kotlinx-coroutines-core/src/channels/Actor.kt
+++ b/core/kotlinx-coroutines-core/src/channels/Actor.kt
@@ -201,9 +201,8 @@
_channel.cancel(cause)
}
- override fun handleJobException(exception: Throwable) {
- handleCoroutineException(context, exception, this)
- }
+ override val cancelsParent: Boolean get() = true
+ override fun handleJobException(exception: Throwable) = handleExceptionViaHandler(parentContext, exception)
}
private class LazyActorCoroutine<E>(
diff --git a/core/kotlinx-coroutines-core/src/internal/Concurrent.kt b/core/kotlinx-coroutines-core/src/internal/Concurrent.kt
index 457440f..8204b97 100644
--- a/core/kotlinx-coroutines-core/src/internal/Concurrent.kt
+++ b/core/kotlinx-coroutines-core/src/internal/Concurrent.kt
@@ -4,6 +4,7 @@
package kotlinx.coroutines.experimental.internal
+import java.util.*
import java.util.concurrent.*
import kotlin.concurrent.withLock as withLockJvm
@@ -13,3 +14,5 @@
internal actual typealias ReentrantLock = java.util.concurrent.locks.ReentrantLock
internal actual inline fun <T> ReentrantLock.withLock(action: () -> T) = this.withLockJvm(action)
+
+internal actual fun <E> identitySet(expectedSize: Int): MutableSet<E> = Collections.newSetFromMap(IdentityHashMap(expectedSize))
diff --git a/core/kotlinx-coroutines-core/test/AsyncJvmTest.kt b/core/kotlinx-coroutines-core/test/AsyncJvmTest.kt
index b30c8bc..f5bcba7 100644
--- a/core/kotlinx-coroutines-core/test/AsyncJvmTest.kt
+++ b/core/kotlinx-coroutines-core/test/AsyncJvmTest.kt
@@ -27,11 +27,11 @@
expect(2)
yield() // to async
expect(4)
- check(d.isActive && !d.isCompleted && !d.isCompletedExceptionally)
+ check(d.isActive && !d.isCompleted && !d.isCancelled)
check(d.cancel())
- check(!d.isActive && !d.isCompleted && !d.isCompletedExceptionally)
- check(d.cancel()) // second attempt returns false
- check(!d.isActive && !d.isCompleted && !d.isCompletedExceptionally)
+ check(!d.isActive && !d.isCompleted && d.isCancelled)
+ check(d.cancel()) // second attempt still returns true (still cancelling)
+ check(!d.isActive && !d.isCompleted && d.isCancelled)
expect(5)
try {
d.await() // awaits
@@ -40,7 +40,9 @@
expect(7)
check(e is CancellationException)
}
- check(!d.isActive && d.isCompleted && d.isCompletedExceptionally && d.isCancelled)
+ check(!d.isActive && d.isCompleted && d.isCancelled)
+ check(!d.cancel()) // now cancel return false -- already cancelled
+ check(!d.isActive && d.isCompleted && d.isCancelled)
finish(8)
}
}
diff --git a/core/kotlinx-coroutines-core/test/AwaitJvmTest.kt b/core/kotlinx-coroutines-core/test/AwaitJvmTest.kt
index df36a20..b52c4ca 100644
--- a/core/kotlinx-coroutines-core/test/AwaitJvmTest.kt
+++ b/core/kotlinx-coroutines-core/test/AwaitJvmTest.kt
@@ -12,7 +12,7 @@
// This test is to make sure that handlers installed on the second deferred do not leak
val d1 = CompletableDeferred<Int>()
val d2 = CompletableDeferred<Int>()
- d1.completeExceptionally(TestException()) // first is crashed
+ d1.cancel(TestException()) // first is crashed
val iterations = 3_000_000 * stressTestMultiplier
for (iter in 1..iterations) {
try {
diff --git a/core/kotlinx-coroutines-core/test/CoroutinesJvmTest.kt b/core/kotlinx-coroutines-core/test/CoroutinesJvmTest.kt
index d693cde..ba8bf1e 100644
--- a/core/kotlinx-coroutines-core/test/CoroutinesJvmTest.kt
+++ b/core/kotlinx-coroutines-core/test/CoroutinesJvmTest.kt
@@ -28,8 +28,8 @@
repeat(n) {
// create a child that already completed
val child = launch(start = CoroutineStart.UNDISPATCHED) { /* do nothing */ }
- // attach it manually
- parent.attachChild(child)
+ // attach it manually via internal API
+ parent.attachChild(child as ChildJob)
}
parent.cancelAndJoin() // cancel parent, make sure no stack overflow
}
diff --git a/core/kotlinx-coroutines-core/test/JobChildStressTest.kt b/core/kotlinx-coroutines-core/test/JobChildStressTest.kt
new file mode 100644
index 0000000..fca7ff6
--- /dev/null
+++ b/core/kotlinx-coroutines-core/test/JobChildStressTest.kt
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.experimental
+
+import org.junit.*
+import org.junit.Test
+import java.util.concurrent.*
+import kotlin.test.*
+
+class JobChildStressTest : TestBase() {
+ private val N_ITERATIONS = 10_000 * stressTestMultiplier
+ private val pool = newFixedThreadPoolContext(3, "JobChildStressTest")
+
+ @After
+ fun tearDown() {
+ pool.close()
+ }
+
+ /**
+ * Perform concurrent launch of a child job & cancellation of the explicit parent job
+ */
+ @Test
+ @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
+ fun testChild() = runTest {
+ val barrier = CyclicBarrier(3)
+ repeat(N_ITERATIONS) {
+ var wasLaunched = false
+ var unhandledException: Throwable? = null
+ val handler = CoroutineExceptionHandler { _, ex ->
+ unhandledException = ex
+ }
+ val scope = CoroutineScope(pool + handler)
+ val parent = CompletableDeferred<Unit>()
+ // concurrent child launcher
+ val launcher = scope.launch {
+ barrier.await()
+ // A: launch child for a parent job
+ launch(parent) {
+ wasLaunched = true
+ throw TestException()
+ }
+ }
+ // concurrent cancel
+ val canceller = scope.launch {
+ barrier.await()
+ // B: cancel parent job of a child
+ parent.cancel()
+ }
+ barrier.await()
+ joinAll(launcher, canceller, parent)
+ assertNull(unhandledException)
+ if (wasLaunched) {
+ val exception = parent.getCompletionExceptionOrNull()
+ assertTrue(exception is TestException, "exception=$exception")
+ }
+ }
+ }
+
+ private class TestException : Exception()
+}
\ No newline at end of file
diff --git a/core/kotlinx-coroutines-core/test/TestBase.kt b/core/kotlinx-coroutines-core/test/TestBase.kt
index 325e76f..7d576cf 100644
--- a/core/kotlinx-coroutines-core/test/TestBase.kt
+++ b/core/kotlinx-coroutines-core/test/TestBase.kt
@@ -63,7 +63,9 @@
error.compareAndSet(null, cause)
println("$message: $cause")
cause.printStackTrace(System.out)
- }
+ println("--- Detected at ---")
+ Throwable().printStackTrace(System.out)
+ }
/**
* Throws [IllegalStateException] when `value` is false like `check` in stdlib, but also ensures that the
@@ -149,7 +151,6 @@
!unhandled[exCount - 1](e) ->
printError("Unhandled exception was unexpected: $e", e)
}
- context[Job]?.cancel(e)
})
} catch (e: Throwable) {
ex = e
diff --git a/core/kotlinx-coroutines-core/test/channels/ActorTest.kt b/core/kotlinx-coroutines-core/test/channels/ActorTest.kt
index a2eff95..478d95e 100644
--- a/core/kotlinx-coroutines-core/test/channels/ActorTest.kt
+++ b/core/kotlinx-coroutines-core/test/channels/ActorTest.kt
@@ -128,7 +128,7 @@
job.await()
expectUnreached()
} catch (e: CancellationException) {
- assertTrue(e.message?.contains("Job was cancelled normally") ?: false)
+ assertTrue(e.message?.contains("Job was cancelled") ?: false)
}
finish(3)
diff --git a/core/kotlinx-coroutines-core/test/exceptions/CoroutineExceptionHandlerJvmTest.kt b/core/kotlinx-coroutines-core/test/exceptions/CoroutineExceptionHandlerJvmTest.kt
new file mode 100644
index 0000000..b9fbfc9
--- /dev/null
+++ b/core/kotlinx-coroutines-core/test/exceptions/CoroutineExceptionHandlerJvmTest.kt
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.experimental.exceptions
+
+import kotlinx.coroutines.experimental.*
+import org.junit.*
+import org.junit.Test
+import kotlin.test.*
+
+class CoroutineExceptionHandlerJvmTest : TestBase() {
+
+ private val exceptionHandler = Thread.getDefaultUncaughtExceptionHandler()
+ private lateinit var caughtException: Throwable
+
+ @Before
+ fun setUp() {
+ Thread.setDefaultUncaughtExceptionHandler({ _, e -> caughtException = e })
+ }
+
+ @After
+ fun tearDown() {
+ Thread.setDefaultUncaughtExceptionHandler(exceptionHandler)
+ }
+
+ @Test
+ fun testFailingHandler() = runBlocking {
+ expect(1)
+ val job = GlobalScope.launch(CoroutineExceptionHandler { _, _ -> throw AssertionError() }) {
+ expect(2)
+ throw TestException()
+ }
+
+ job.join()
+ assertTrue(caughtException is RuntimeException)
+ assertTrue(caughtException.cause is AssertionError)
+ assertTrue(caughtException.suppressed()[0] is TestException)
+
+ finish(3)
+ }
+
+ private class TestException : Throwable()
+}
diff --git a/core/kotlinx-coroutines-core/test/exceptions/Exceptions.kt b/core/kotlinx-coroutines-core/test/exceptions/Exceptions.kt
index ba8428f..d97a433 100644
--- a/core/kotlinx-coroutines-core/test/exceptions/Exceptions.kt
+++ b/core/kotlinx-coroutines-core/test/exceptions/Exceptions.kt
@@ -34,17 +34,22 @@
}
class CapturingHandler : AbstractCoroutineContextElement(CoroutineExceptionHandler),
- CoroutineExceptionHandler {
- val unhandled: MutableList<Throwable> = Collections.synchronizedList(ArrayList<Throwable>())!!
+ CoroutineExceptionHandler
+{
+ private var unhandled: ArrayList<Throwable>? = ArrayList()
- override fun handleException(context: CoroutineContext, exception: Throwable) {
- unhandled.add(exception)
+ override fun handleException(context: CoroutineContext, exception: Throwable) = synchronized<Unit>(this) {
+ unhandled!!.add(exception)
}
- fun getException(): Throwable {
- val size = unhandled.size
+ fun getExceptions(): List<Throwable> = synchronized(this) {
+ return unhandled!!.also { unhandled = null }
+ }
+
+ fun getException(): Throwable = synchronized(this) {
+ val size = unhandled!!.size
assert(size == 1) { "Expected one unhandled exception, but have $size: $unhandled" }
- return unhandled[0]
+ return unhandled!![0].also { unhandled = null }
}
}
@@ -63,5 +68,5 @@
): List<Throwable> {
val handler = CapturingHandler()
runBlocking(context + handler, block = block)
- return handler.unhandled
+ return handler.getExceptions()
}
diff --git a/core/kotlinx-coroutines-core/test/exceptions/JobBasicCancellationTest.kt b/core/kotlinx-coroutines-core/test/exceptions/JobBasicCancellationTest.kt
index 08bbb98..8421d5e 100644
--- a/core/kotlinx-coroutines-core/test/exceptions/JobBasicCancellationTest.kt
+++ b/core/kotlinx-coroutines-core/test/exceptions/JobBasicCancellationTest.kt
@@ -150,8 +150,8 @@
@Test
fun testConsecutiveCancellation() {
val deferred = CompletableDeferred<Int>()
- assertTrue(deferred.completeExceptionally(IndexOutOfBoundsException()))
- assertFalse(deferred.completeExceptionally(AssertionError()))
+ assertTrue(deferred.cancel(IndexOutOfBoundsException()))
+ assertFalse(deferred.cancel(AssertionError())) // second cancelled is too late
val cause = deferred.getCancellationException().cause!!
assertTrue(cause is IndexOutOfBoundsException)
assertNull(cause.cause)
diff --git a/core/kotlinx-coroutines-core/test/exceptions/JobExceptionHandlingTest.kt b/core/kotlinx-coroutines-core/test/exceptions/JobExceptionHandlingTest.kt
index 4d50096..f91399b 100644
--- a/core/kotlinx-coroutines-core/test/exceptions/JobExceptionHandlingTest.kt
+++ b/core/kotlinx-coroutines-core/test/exceptions/JobExceptionHandlingTest.kt
@@ -270,4 +270,36 @@
assertTrue(suppressed[0] is IOException)
assertTrue(suppressed[1] is IllegalArgumentException)
}
+
+ @Test
+ fun testBadException() = runTest(unhandled = listOf({e -> e is BadException})) {
+ val job = launch(Job()) {
+ expect(2)
+ launch {
+ expect(3)
+ throw BadException()
+ }
+
+ launch(start = CoroutineStart.ATOMIC) {
+ expect(4)
+ throw BadException()
+ }
+
+ yield()
+ BadException()
+ }
+
+ expect(1)
+ yield()
+ yield()
+ expect(5)
+ job.join()
+ finish(6)
+ }
+
+ private class BadException : Exception() {
+ override fun hashCode(): Int {
+ throw AssertionError()
+ }
+ }
}
diff --git a/core/kotlinx-coroutines-core/test/exceptions/JobExceptionsStressTest.kt b/core/kotlinx-coroutines-core/test/exceptions/JobExceptionsStressTest.kt
index 878e3ba..bd0b597 100644
--- a/core/kotlinx-coroutines-core/test/exceptions/JobExceptionsStressTest.kt
+++ b/core/kotlinx-coroutines-core/test/exceptions/JobExceptionsStressTest.kt
@@ -33,36 +33,39 @@
val job = launch(NonCancellable) {
launch(start = CoroutineStart.ATOMIC) {
barrier.await()
- throw ArithmeticException()
+ throw TestException1()
}
launch(start = CoroutineStart.ATOMIC) {
barrier.await()
- throw IOException()
+ throw TestException2()
}
launch(start = CoroutineStart.ATOMIC) {
barrier.await()
- throw IllegalArgumentException()
+ throw TestException3()
}
- delay(Long.MAX_VALUE)
+ delay(1000) // to avoid OutOfMemory errors....
}
barrier.await()
job.join()
}
-
val classes = mutableSetOf(
- IllegalArgumentException::class,
- IOException::class, ArithmeticException::class)
-
+ TestException1::class,
+ TestException2::class,
+ TestException3::class
+ )
val suppressedExceptions = exception.suppressed().toSet()
assertTrue(classes.remove(exception::class),
- "Failed to remove ${exception::class} from $suppressedExceptions")
-
+ "Failed to remove ${exception::class} from $suppressedExceptions"
+ )
for (throwable in suppressedExceptions.toSet()) { // defensive copy
assertTrue(classes.remove(throwable::class),
"Failed to remove ${throwable::class} from $suppressedExceptions")
}
-
assertTrue(classes.isEmpty(), "Expected all exception to be present, but following exceptions are missing: $classes")
}
}
+
+ private class TestException1 : Exception()
+ private class TestException2 : Exception()
+ private class TestException3 : Exception()
}
diff --git a/core/kotlinx-coroutines-core/test/exceptions/JobNestedExceptionsTest.kt b/core/kotlinx-coroutines-core/test/exceptions/JobNestedExceptionsTest.kt
index 5affae1..5763bb4 100644
--- a/core/kotlinx-coroutines-core/test/exceptions/JobNestedExceptionsTest.kt
+++ b/core/kotlinx-coroutines-core/test/exceptions/JobNestedExceptionsTest.kt
@@ -68,22 +68,19 @@
fun testNestedAtomicThrow() {
val exception = runBlock {
expect(1)
- val job = launch(NonCancellable, start = CoroutineStart.ATOMIC) {
+ val job = launch(NonCancellable + CoroutineName("outer"), start = CoroutineStart.ATOMIC) {
expect(2)
-
- launch(start = CoroutineStart.ATOMIC) {
- expect(3)
+ launch(CoroutineName("nested"), start = CoroutineStart.ATOMIC) {
+ expect(4)
throw IOException()
}
-
+ expect(3)
throw ArithmeticException()
}
-
job.join()
- finish(4)
+ finish(5)
}
-
- assertTrue(exception is ArithmeticException)
+ assertTrue(exception is ArithmeticException, "Found $exception")
checkException<IOException>(exception.suppressed()[0])
}
@@ -91,31 +88,33 @@
fun testChildThrowsDuringCompletion() {
val exceptions = runBlockForMultipleExceptions {
expect(1)
- val job = launch(NonCancellable, start = CoroutineStart.ATOMIC) {
+ val job = launch(NonCancellable + CoroutineName("outer"), start = CoroutineStart.ATOMIC) {
expect(2)
- launch(start = CoroutineStart.ATOMIC) {
- expect(3)
- launch(start = CoroutineStart.ATOMIC) {
+ launch(CoroutineName("nested"), start = CoroutineStart.ATOMIC) {
+ expect(4)
+ launch(CoroutineName("nested2"), start = CoroutineStart.ATOMIC) {
// This child attaches to the parent and throws after parent completion
- expect(4)
+ expect(6)
throw NullPointerException()
}
-
+ expect(5)
throw IOException()
}
-
+ expect(3)
throw ArithmeticException()
}
job.join()
- finish(5)
+ finish(7)
}
- assertEquals(1, exceptions.size)
+ assertEquals(1, exceptions.size, "Found $exceptions")
val exception = exceptions[0]
+ assertTrue(exception is ArithmeticException, "Exception is $exception")
val suppressed = exception.suppressed()
- checkException<IOException>(suppressed[0])
- checkException<NullPointerException>(suppressed[1])
+ // the order of suppressed exceptions here is a bit hacky, may change in the future
+ checkException<NullPointerException>(suppressed[0])
+ checkException<IOException>(suppressed[1])
checkCycles(exception)
}
}
diff --git a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt
index b2809fb..b494563 100644
--- a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt
+++ b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt
@@ -160,7 +160,7 @@
} catch (e: Throwable) {
// unwrap original cause from ExecutionException
val original = (e as? ExecutionException)?.cause ?: e
- CompletableDeferred<T>().also { it.completeExceptionally(original) }
+ CompletableDeferred<T>().also { it.cancel(original) }
}
}
val result = CompletableDeferred<T>()
@@ -168,7 +168,7 @@
if (exception == null) {
result.complete(value)
} else {
- result.completeExceptionally(exception)
+ result.cancel(exception)
}
}
if (this is Future<*>) result.cancelFutureOnCompletion(this)
diff --git a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt
index 7f125f7..0b0006e 100644
--- a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt
+++ b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt
@@ -277,7 +277,7 @@
}
val deferred = future.asDeferred()
- assertTrue(deferred.isCompletedExceptionally)
+ assertTrue(deferred.isCancelled)
val completionException = deferred.getCompletionExceptionOrNull()!!
assertTrue(completionException is TestException)
assertEquals("something went wrong", completionException.message)
@@ -306,7 +306,7 @@
deferred.await()
fail("deferred.await() should throw an exception")
} catch (e: Exception) {
- assertTrue(deferred.isCompletedExceptionally)
+ assertTrue(deferred.isCancelled)
assertTrue(e is CompletionException) // that's how supplyAsync wraps it
val cause = e.cause!!
assertTrue(cause is TestException)
diff --git a/js/kotlinx-coroutines-core-js/src/internal/Concurrent.kt b/js/kotlinx-coroutines-core-js/src/internal/Concurrent.kt
index c4f3d7f..a8b541a 100644
--- a/js/kotlinx-coroutines-core-js/src/internal/Concurrent.kt
+++ b/js/kotlinx-coroutines-core-js/src/internal/Concurrent.kt
@@ -14,3 +14,5 @@
}
internal actual fun <E> subscriberList(): SubscribersList<E> = CopyOnWriteList()
+
+internal actual fun <E> identitySet(expectedSize: Int): MutableSet<E> = HashSet(expectedSize)
diff --git a/js/kotlinx-coroutines-core-js/test/TestBase.kt b/js/kotlinx-coroutines-core-js/test/TestBase.kt
index 473f279..7873036 100644
--- a/js/kotlinx-coroutines-core-js/test/TestBase.kt
+++ b/js/kotlinx-coroutines-core-js/test/TestBase.kt
@@ -81,7 +81,6 @@
!unhandled[exCount - 1](e) ->
printError("Unhandled exception was unexpected: $e", e)
}
- context[Job]?.cancel(e)
}).catch { e ->
ex = e
if (expected != null) {
diff --git a/native/kotlinx-coroutines-core-native/src/internal/Concurrent.kt b/native/kotlinx-coroutines-core-native/src/internal/Concurrent.kt
index 13a6d6a..ed872b3 100644
--- a/native/kotlinx-coroutines-core-native/src/internal/Concurrent.kt
+++ b/native/kotlinx-coroutines-core-native/src/internal/Concurrent.kt
@@ -14,3 +14,5 @@
}
internal actual fun <E> subscriberList(): MutableList<E> = CopyOnWriteList<E>()
+
+internal actual fun <E> identitySet(expectedSize: Int): MutableSet<E> = HashSet()
diff --git a/native/kotlinx-coroutines-core-native/test/TestBase.kt b/native/kotlinx-coroutines-core-native/test/TestBase.kt
index 7f75e44..81e680b 100644
--- a/native/kotlinx-coroutines-core-native/test/TestBase.kt
+++ b/native/kotlinx-coroutines-core-native/test/TestBase.kt
@@ -76,7 +76,6 @@
!unhandled[exCount - 1](e) ->
printError("Unhandled exception was unexpected: $e", e)
}
- context[Job]?.cancel(e)
})
} catch (e: Throwable) {
ex = e