Fixed initialization of Job with parent (initParentJob), fixed handling on uncaught exceptions in standalone coroutines
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/AbstractCoroutine.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/AbstractCoroutine.kt
index 7e7da37..631c06a 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/AbstractCoroutine.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/AbstractCoroutine.kt
@@ -9,10 +9,8 @@
* It stores the result of continuation in the state of the job.
*/
@Suppress("LeakingThis")
-public abstract class AbstractCoroutine<in T>(
- parentContext: CoroutineContext
-) : JobSupport(parentContext[Job]), Continuation<T> {
- override val context: CoroutineContext = parentContext + this // mixes this job into this context
+public abstract class AbstractCoroutine<in T>(parentContext: CoroutineContext) : JobSupport(), Continuation<T> {
+ override val context: CoroutineContext = parentContext + this // merges this job into this context
final override fun resume(value: T) {
while (true) { // lock-free loop on state
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
index 72154b4..0a0409a 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
@@ -60,15 +60,19 @@
private class StandaloneCoroutine(
val parentContext: CoroutineContext
) : AbstractCoroutine<Unit>(parentContext) {
+ init { initParentJob(parentContext[Job]) }
+
override fun afterCompletion(state: Any?) {
// note the use of the parent context below!
- if (state is CompletedExceptionally) handleCoroutineException(parentContext, state.exception)
+ if (state is CompletedExceptionally) handleCoroutineException(parentContext, state.cancelReason)
}
}
private class BlockingCoroutine<T>(parentContext: CoroutineContext) : AbstractCoroutine<T>(parentContext) {
val blockedThread: Thread = Thread.currentThread()
+ init { initParentJob(parentContext[Job]) }
+
override fun afterCompletion(state: Any?) {
LockSupport.unpark(blockedThread)
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
index 9723585..2895b4c 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
@@ -17,11 +17,11 @@
/**
* Suspend coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to
- * the [block].
+ * the [block]. This function throws [CancellationException] if the coroutine is cancelled while suspended.
*/
public inline suspend fun <T> suspendCancellableCoroutine(crossinline block: (CancellableContinuation<T>) -> Unit): T =
- suspendCoroutineOrReturn { c ->
- val safe = SafeCancellableContinuation(c)
+ suspendCoroutineOrReturn { cont ->
+ val safe = SafeCancellableContinuation(cont, getParentJobOrAbort(cont))
block(safe)
safe.getResult()
}
@@ -29,12 +29,23 @@
// --------------- implementation details ---------------
@PublishedApi
+internal fun getParentJobOrAbort(cont: Continuation<*>): Job? {
+ val job = cont.context[Job]
+ // fast path when parent job is already complete (we don't even construct SafeCancellableContinuation object)
+ job?.isActive?.let { if (!it) throw CancellationException() }
+ return job
+}
+
+@PublishedApi
internal class SafeCancellableContinuation<in T>(
- private val delegate: Continuation<T>
+ private val delegate: Continuation<T>,
+ parentJob: Job?
) : AbstractCoroutine<T>(delegate.context), CancellableContinuation<T> {
// only updated from the thread that invoked suspendCancellableCoroutine
private var suspendedThread: Thread? = Thread.currentThread()
+ init { initParentJob(parentJob) }
+
fun getResult(): Any? {
if (suspendedThread != null) {
suspendedThread = null
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineExceptionHandler.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineExceptionHandler.kt
index 1e4ee7d..3400772 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineExceptionHandler.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineExceptionHandler.kt
@@ -7,9 +7,10 @@
* Helper function for coroutine builder implementations to handle uncaught exception in coroutines.
* It tries to handle uncaught exception in the following way:
* * If there is [CoroutineExceptionHandler] in the context, then it is used.
+ * * Otherwise, if exception is [CancellationException] then it is ignored
+ * (because that is the supposed mechanism to cancel the running coroutine)
* * Otherwise, if there is a [Job] in the context, then [Job.cancel] is invoked and if it
* returns `true` (it was still active), then the exception is considered to be handled.
- * * Otherwise, if exception is [CancellationException] then it is ignored.
* * Otherwise, current thread's [Thread.uncaughtExceptionHandler] is used.
*/
fun handleCoroutineException(context: CoroutineContext, exception: Throwable) {
@@ -17,10 +18,10 @@
it.handleException(context, exception)
return
}
- // quit if successfully pushed exception as cancellation cancelReason
- if (context[Job]?.cancel(exception) ?: false) return
// ignore CancellationException (they are normal means to terminate a coroutine)
if (exception is CancellationException) return
+ // quit if successfully pushed exception as cancellation reason
+ if (context[Job]?.cancel(exception) ?: false) return
// otherwise just use thread's handler
val currentThread = Thread.currentThread()
currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception)
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
index 7d24d2d..59f36cc 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
@@ -39,6 +39,8 @@
private class DeferredCoroutine<T>(
parentContext: CoroutineContext
) : AbstractCoroutine<T>(parentContext), Deferred<T> {
+ init { initParentJob(parentContext[Job]) }
+
@Suppress("UNCHECKED_CAST")
suspend override fun await(): T {
// quick check if already complete (avoid extra object creation)
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
index b0f0c86..acfc634 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
@@ -2,7 +2,6 @@
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
-import java.util.concurrent.CancellationException
import java.util.concurrent.Future
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
import kotlin.coroutines.AbstractCoroutineContextElement
@@ -28,7 +27,7 @@
/**
* Creates new job object. It is optionally a child of a [parent] job.
*/
- public operator fun invoke(parent: Job? = null): Job = JobSupport(parent)
+ public operator fun invoke(parent: Job? = null): Job = JobImpl(parent)
}
/**
@@ -67,9 +66,12 @@
}
}
-typealias CompletionHandler = (Throwable?) -> Unit
+public typealias CompletionHandler = (Throwable?) -> Unit
-typealias CancellationException = CancellationException
+/**
+ * Thrown by cancellable suspending functions if the [Job] of the coroutine is cancelled while it is suspending.
+ */
+public typealias CancellationException = java.util.concurrent.CancellationException
/**
* Unregisters a specified [registration] when this job is complete.
@@ -118,15 +120,13 @@
* state and mare store addition state information for completed jobs, like their result values.
*/
@Suppress("LeakingThis")
-public open class JobSupport(
- parent: Job? = null
-) : AbstractCoroutineContextElement(Job), Job {
+public open class JobSupport : AbstractCoroutineContextElement(Job), Job {
// keeps a stack of cancel listeners or a special CANCELLED, other values denote completed scope
@Volatile
private var state: Any? = ActiveList() // will drop the list on cancel
- // directly pass HandlerNode to parent scope to optimize one closure object (see makeNode)
- private val registration: Job.Registration? = parent?.onCompletion(CancelOnCompletion(parent, this))
+ @Volatile
+ private var registration: Job.Registration? = null
protected companion object {
@JvmStatic
@@ -134,6 +134,17 @@
AtomicReferenceFieldUpdater.newUpdater(JobSupport::class.java, Any::class.java, "state")
}
+ // invoke at most once after construction after all other initialization
+ protected fun initParentJob(parent: Job?) {
+ if (parent == null) return
+ check(registration == null)
+ // directly pass HandlerNode to parent scope to optimize one closure object (see makeNode)
+ val newRegistration = parent.onCompletion(CancelOnCompletion(parent, this))
+ registration = newRegistration
+ // now check our state _after_ registering (see updateState order of actions)
+ if (state !is Active) newRegistration.unregister()
+ }
+
protected fun getState(): Any? = state
protected fun updateState(expect: Any, update: Any?): Boolean {
@@ -141,7 +152,7 @@
require(update !is Active) // only active -> inactive transition is allowed
if (!STATE.compareAndSet(this, expect, update)) return false
// #1. Unregister from parent job
- registration?.unregister()
+ registration?.unregister() // volatile read registration _after_ state was updated
// #2 Invoke completion handlers
val reason = (update as? CompletedExceptionally)?.cancelReason
var completionException: Throwable? = null
@@ -202,21 +213,26 @@
private class ActiveList : LockFreeLinkedListHead(), Active
protected abstract class CompletedExceptionally {
- abstract val cancelReason: Throwable?
- abstract val exception: Throwable
+ abstract val cancelReason: Throwable // original reason or fresh CancellationException
+ abstract val exception: Throwable // the exception to be thrown in continuation
}
- protected class Cancelled(override val cancelReason: Throwable?) : CompletedExceptionally() {
+ protected class Cancelled(specifiedReason: Throwable?) : CompletedExceptionally() {
+ @Volatile
+ private var _cancelReason = specifiedReason // materialize CancellationException on first need
+
+ override val cancelReason: Throwable get() =
+ _cancelReason ?: // atomic read volatile var or else create new
+ CancellationException().also { _cancelReason = it }
+
@Volatile
private var _exception: Throwable? = null // convert reason to CancellationException on first need
+
override val exception: Throwable get() =
- _exception ?: // atomic read volatile var or else
- run {
- val result = cancelReason as? CancellationException ?:
- CancellationException().apply { if (cancelReason != null) initCause(cancelReason) }
- _exception = result
- result
- }
+ _exception ?: // atomic read volatile var or else build new
+ (cancelReason as? CancellationException ?:
+ CancellationException(cancelReason.message).apply { initCause(cancelReason) })
+ .also { _exception = it }
}
protected class Failed(override val exception: Throwable) : CompletedExceptionally() {
@@ -288,3 +304,7 @@
override fun invoke(reason: Throwable?) { node.remove() }
override fun toString() = "RemoveOnCompletion[$node]"
}
+
+private class JobImpl(parent: Job? = null) : JobSupport() {
+ init { initParentJob(parent) }
+}
\ No newline at end of file