launched jobs and await can be cancelled while waiting in dispatch queue
* suspendAtomicCancellableCoroutine function is introduced for funs like
send/receive/receiveOrNull that require atomic cancellation
(they cannot be cancelled after decision was made)
* Coroutines started with default mode (CoroutineStart.ATOMIC) using
async/launch/actor builders can be cancelled before execution starts.
* CoroutineStart.ATOMIC is introduced as a start mode to specify that
coroutine cannot be cancelled before its execution start.
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
index e312f27..014f304 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
@@ -16,7 +16,6 @@
package kotlinx.coroutines.experimental
-import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
import java.util.concurrent.locks.LockSupport
import kotlin.coroutines.experimental.*
import kotlin.coroutines.experimental.intrinsics.startCoroutineUninterceptedOrReturn
@@ -146,7 +145,7 @@
private val block: suspend CoroutineScope.() -> Unit
) : StandaloneCoroutine(parentContext, active = false) {
override fun onStart() {
- block.startCoroutine(this, this)
+ block.startCoroutineCancellable(this, this)
}
}
@@ -158,7 +157,7 @@
private class RunContinuationCoroutine<in T>(
override val parentContext: CoroutineContext,
continuation: Continuation<T>
-) : CancellableContinuationImpl<T>(continuation, active = true)
+) : CancellableContinuationImpl<T>(continuation, defaultResumeMode = MODE_CANCELLABLE, active = true)
private class BlockingCoroutine<T>(
override val parentContext: CoroutineContext,
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Cancellable.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Cancellable.kt
new file mode 100644
index 0000000..8364761
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Cancellable.kt
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental
+
+import kotlin.coroutines.experimental.Continuation
+import kotlin.coroutines.experimental.intrinsics.createCoroutineUnchecked
+
+/**
+ * Use this function to start coroutine in a cancellable way, so that it can be cancelled
+ * while waiting to be dispatched.
+ *
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+internal fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) =
+ createCoroutineUnchecked(completion).resumeCancellable(Unit)
+
+/**
+ * Use this function to start coroutine in a cancellable way, so that it can be cancelled
+ * while waiting to be dispatched.
+ *
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
+ createCoroutineUnchecked(receiver, completion).resumeCancellable(Unit)
+
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
index cfabad1..f237965 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
@@ -20,8 +20,10 @@
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.CoroutineContext
+import kotlin.coroutines.experimental.createCoroutine
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
+import kotlin.coroutines.experimental.intrinsics.createCoroutineUnchecked
import kotlin.coroutines.experimental.suspendCoroutine
// --------------- cancellable continuations ---------------
@@ -111,23 +113,43 @@
}
/**
- * Suspend coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to
+ * Suspends coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to
* the [block]. This function throws [CancellationException] if the coroutine is cancelled while suspended.
*
* If [holdCancellability] optional parameter is `true`, then the coroutine is suspended, but it is not
* cancellable until [CancellableContinuation.initCancellability] is invoked.
+ *
+ * See [suspendAtomicCancellableCoroutine] for suspending functions that need *atomic cancellation*.
*/
public inline suspend fun <T> suspendCancellableCoroutine(
holdCancellability: Boolean = false,
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendCoroutineOrReturn { cont ->
- val cancellable = CancellableContinuationImpl(cont, active = true)
+ val cancellable = CancellableContinuationImpl(cont, defaultResumeMode = MODE_CANCELLABLE, active = true)
if (!holdCancellability) cancellable.initCancellability()
block(cancellable)
cancellable.getResult()
}
+/**
+ * Suspends coroutine similar to [suspendCancellableCoroutine], but with *atomic cancellation*.
+ *
+ * When suspended function throws [CancellationException] it means that the continuation was not resumed.
+ * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
+ * continue to execute even after it was cancelled from the same thread in the case when the continuation
+ * was already resumed and was posted for execution to the thread's queue.
+ */
+public inline suspend fun <T> suspendAtomicCancellableCoroutine(
+ holdCancellability: Boolean = false,
+ crossinline block: (CancellableContinuation<T>) -> Unit
+): T =
+ suspendCoroutineOrReturn { cont ->
+ val cancellable = CancellableContinuationImpl(cont, defaultResumeMode = MODE_ATOMIC_DEFAULT, active = true)
+ if (!holdCancellability) cancellable.initCancellability()
+ block(cancellable)
+ cancellable.getResult()
+ }
/**
* Removes a given node on cancellation.
@@ -149,14 +171,16 @@
override fun toString() = "RemoveOnCancel[$node]"
}
-internal const val MODE_DISPATCHED = 0
-internal const val MODE_UNDISPATCHED = 1
-internal const val MODE_DIRECT = 2
+@PublishedApi internal const val MODE_ATOMIC_DEFAULT = 0 // schedule non-cancellable dispatch for suspendCoroutine
+@PublishedApi internal const val MODE_CANCELLABLE = 1 // schedule cancellable dispatch for suspendCancellableCoroutine
+@PublishedApi internal const val MODE_DIRECT = 2 // when the context is right just invoke the delegate continuation direct
+@PublishedApi internal const val MODE_UNDISPATCHED = 3 // when the thread is right, but need to mark it with current coroutine
@PublishedApi
internal open class CancellableContinuationImpl<in T>(
@JvmField
protected val delegate: Continuation<T>,
+ override val defaultResumeMode: Int,
active: Boolean
) : AbstractCoroutine<T>(active), CancellableContinuation<T> {
@Volatile
@@ -239,17 +263,19 @@
if (state is CompletedExceptionally) {
val exception = state.exception
when (mode) {
- MODE_DISPATCHED -> delegate.resumeWithException(exception)
- MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatchedWithException(exception)
+ MODE_ATOMIC_DEFAULT -> delegate.resumeWithException(exception)
+ MODE_CANCELLABLE -> delegate.resumeCancellableWithException(exception)
MODE_DIRECT -> delegate.resumeDirectWithException(exception)
+ MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatchedWithException(exception)
else -> error("Invalid mode $mode")
}
} else {
val value = getSuccessfulResult<T>(state)
when (mode) {
- MODE_DISPATCHED -> delegate.resume(value)
- MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatched(value)
+ MODE_ATOMIC_DEFAULT -> delegate.resume(value)
+ MODE_CANCELLABLE -> delegate.resumeCancellable(value)
MODE_DIRECT -> delegate.resumeDirect(value)
+ MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatched(value)
else -> error("Invalid mode $mode")
}
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
index e81d4ca..94f49c0 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
@@ -102,6 +102,28 @@
}
+// named class for ease of debugging, better stack-traces and optimize the number of anonymous classes
+internal class DispatchTask<in T>(
+ private val dispatched: DispatchedContinuation<T>,
+ private val value: Any?, // T | Throwable
+ private val exception: Boolean,
+ private val cancellable: Boolean
+) : Runnable {
+ @Suppress("UNCHECKED_CAST")
+ override fun run() {
+ val job = if (cancellable) dispatched.context[Job] else null
+ when {
+ job != null && job.isCompleted ->
+ dispatched.resumeUndispatchedWithException(job.getCompletionException())
+ exception -> dispatched.resumeUndispatchedWithException(value as Throwable)
+ else -> dispatched.resumeUndispatched(value as T)
+ }
+ }
+
+ override fun toString(): String =
+ "DispatchTask[$value, cancellable=$cancellable, $dispatched]"
+}
+
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
@@ -109,13 +131,37 @@
override fun resume(value: T) {
val context = continuation.context
if (dispatcher.isDispatchNeeded(context))
- dispatcher.dispatch(context, Runnable {
- resumeUndispatched(value)
- })
+ dispatcher.dispatch(context, DispatchTask(this, value, exception = false, cancellable = false))
else
resumeUndispatched(value)
}
+ override fun resumeWithException(exception: Throwable) {
+ val context = continuation.context
+ if (dispatcher.isDispatchNeeded(context))
+ dispatcher.dispatch(context, DispatchTask(this, exception, exception = true, cancellable = false))
+ else
+ resumeUndispatchedWithException(exception)
+ }
+
+ @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
+ inline fun resumeCancellable(value: T) {
+ val context = continuation.context
+ if (dispatcher.isDispatchNeeded(context))
+ dispatcher.dispatch(context, DispatchTask(this, value, exception = false, cancellable = true))
+ else
+ resumeUndispatched(value)
+ }
+
+ @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
+ inline fun resumeCancellableWithException(exception: Throwable) {
+ val context = continuation.context
+ if (dispatcher.isDispatchNeeded(context))
+ dispatcher.dispatch(context, DispatchTask(this, exception, exception = true, cancellable = true))
+ else
+ resumeUndispatchedWithException(exception)
+ }
+
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
inline fun resumeUndispatched(value: T) {
withCoroutineContext(context) {
@@ -123,16 +169,6 @@
}
}
- override fun resumeWithException(exception: Throwable) {
- val context = continuation.context
- if (dispatcher.isDispatchNeeded(context))
- dispatcher.dispatch(context, Runnable {
- resumeUndispatchedWithException(exception)
- })
- else
- resumeUndispatchedWithException(exception)
- }
-
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
inline fun resumeUndispatchedWithException(exception: Throwable) {
withCoroutineContext(context) {
@@ -152,6 +188,18 @@
}
})
}
+
+ override fun toString(): String = "DispatchedContinuation[$dispatcher, $continuation]"
+}
+
+internal fun <T> Continuation<T>.resumeCancellable(value: T) = when (this) {
+ is DispatchedContinuation -> resumeCancellable(value)
+ else -> resume(value)
+}
+
+internal fun <T> Continuation<T>.resumeCancellableWithException(exception: Throwable) = when (this) {
+ is DispatchedContinuation -> resumeCancellableWithException(exception)
+ else -> resumeWithException(exception)
}
internal fun <T> Continuation<T>.resumeDirect(value: T) = when (this) {
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt
index e976f7e..79e2356 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt
@@ -67,7 +67,7 @@
protected open fun createContext() = parentContext + this
- protected open val defaultResumeMode: Int get() = MODE_DISPATCHED
+ protected open val defaultResumeMode: Int get() = MODE_ATOMIC_DEFAULT
protected open val ignoreRepeatedResume: Boolean get() = false
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineStart.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineStart.kt
index 6389d53..01f7ed2 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineStart.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineStart.kt
@@ -35,6 +35,12 @@
*
* Note, that [Unconfined] dispatcher always returns `false` from its [CoroutineDispatcher.isDispatchNeeded]
* function, so starting coroutine with [Unconfined] dispatcher by [DEFAULT] is the same as using [UNDISPATCHED].
+ *
+ * If coroutine [Job] is cancelled before it even had a chance to start executing, then it will not start its
+ * execution at all, but complete with an exception.
+ *
+ * Cancellability of coroutine at suspension points depends on the particular implementation details of
+ * suspending functions. Use [suspendCancellableCoroutine] to implement cancellable suspending functions.
*/
DEFAULT,
@@ -43,10 +49,22 @@
*
* See the documentation for the corresponding coroutine builders for details:
* [launch], [async], and [actor][kotlinx.coroutines.experimental.channels.actor].
+ *
+ * If coroutine [Job] is cancelled before it even had a chance to start executing, then it will not start its
+ * execution at all, but complete with an exception.
*/
LAZY,
/**
+ * Atomically schedules coroutines for execution according to its context. This is similar to [DEFAULT],
+ * but the coroutine cannot be cancelled before it starts executing.
+ *
+ * Cancellability of coroutine at suspension points depends on the particular implementation details of
+ * suspending functions as in [DEFAULT].
+ */
+ ATOMIC,
+
+ /**
* Immediately executes coroutine until its first suspension point _in the current thread_ as if it the
* coroutine was started using [Unconfined] dispatcher. However, when coroutine is resumed from suspension
* it is dispatched according to the [CoroutineDispatcher] in its context.
@@ -56,13 +74,15 @@
/**
* Starts the corresponding block as a coroutine with this coroutine start strategy.
*
- * * [DEFAULT] uses [startCoroutine].
+ * * [DEFAULT] uses [startCoroutineCancellable].
+ * * [ATOMIC] uses [startCoroutine].
* * [UNDISPATCHED] uses [startCoroutineUndispatched].
* * [LAZY] does nothing.
*/
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>) =
when (this) {
- CoroutineStart.DEFAULT -> block.startCoroutine(receiver, completion)
+ CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
+ CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
CoroutineStart.LAZY -> Unit // will start lazily
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
index e36a74b..8f11b05 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
@@ -206,10 +206,11 @@
@Suppress("UNCHECKED_CAST")
internal fun <R> selectAwaitCompletion(select: SelectInstance<R>, block: suspend (T) -> R, state: Any? = this.state) {
if (select.trySelect(idempotent = null)) {
+ // Note: await is non-atomic (can be cancelled while dispatched)
if (state is CompletedExceptionally)
- select.resumeSelectWithException(state.exception, MODE_DISPATCHED)
+ select.resumeSelectWithException(state.exception, MODE_CANCELLABLE)
else
- block.startCoroutine(state as T, select.completion)
+ block.startCoroutineCancellable(state as T, select.completion)
}
}
@@ -236,6 +237,6 @@
private val block: suspend CoroutineScope.() -> T
) : DeferredCoroutine<T>(parentContext, active = false) {
override fun onStart() {
- block.startCoroutine(this, this)
+ block.startCoroutineCancellable(this, this)
}
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
index c6bf0bc..a1237ae 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
@@ -352,7 +352,11 @@
if (isCompleted) newRegistration.dispose()
}
- internal open fun onParentCompletion(cause: Throwable?) {
+ /**
+ * Invoked at most once on parent completion.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected open fun onParentCompletion(cause: Throwable?) {
// if parent was completed with CancellationException then use it as the cause of our cancellation, too.
// however, we shall not use application specific exceptions here. So if parent crashes due to IOException,
// we cannot and should not cancel the child with IOException
@@ -657,7 +661,9 @@
final override fun cancel(cause: Throwable?): Boolean {
while (true) { // lock-free loop on state
val state = this.state as? Incomplete ?: return false // quit if already complete
- if (updateState(state, Cancelled(state.idempotentStart, cause), mode = 0)) return true
+ // we are dispatching coroutine to process its cancellation exception, so there is no need for
+ // an extra check for Job status in MODE_CANCELLABLE
+ if (updateState(state, Cancelled(state.idempotentStart, cause), mode = MODE_ATOMIC_DEFAULT)) return true
}
}
@@ -670,6 +676,7 @@
/**
* Override for post-completion actions that need to do something with the state.
+ * @param mode completion mode.
*/
protected open fun afterCompletion(state: Any?, mode: Int) {}
@@ -766,6 +773,14 @@
idempotentStart: Any?,
cause: Throwable?
) : CompletedExceptionally(idempotentStart, cause)
+
+ private class ParentOnCompletion(
+ parentJob: Job,
+ private val subordinateJob: JobSupport
+ ) : JobNode<Job>(parentJob) {
+ override fun invoke(reason: Throwable?) { subordinateJob.onParentCompletion(reason) }
+ override fun toString(): String = "ParentOnCompletion[$subordinateJob]"
+ }
}
internal val ALREADY_SELECTED: Any = Symbol("ALREADY_SELECTED")
@@ -791,7 +806,7 @@
private class InvokeOnCompletion(
job: Job,
- @JvmField val handler: CompletionHandler
+ private val handler: CompletionHandler
) : JobNode<Job>(job) {
override fun invoke(reason: Throwable?) = handler.invoke(reason)
override fun toString() = "InvokeOnCompletion[${handler::class.java.name}@${Integer.toHexString(System.identityHashCode(handler))}]"
@@ -799,7 +814,7 @@
private class ResumeOnCompletion(
job: Job,
- @JvmField val continuation: Continuation<Unit>
+ private val continuation: Continuation<Unit>
) : JobNode<Job>(job) {
override fun invoke(reason: Throwable?) = continuation.resume(Unit)
override fun toString() = "ResumeOnCompletion[$continuation]"
@@ -807,23 +822,15 @@
internal class DisposeOnCompletion(
job: Job,
- @JvmField val handle: DisposableHandle
+ private val handle: DisposableHandle
) : JobNode<Job>(job) {
override fun invoke(reason: Throwable?) = handle.dispose()
override fun toString(): String = "DisposeOnCompletion[$handle]"
}
-private class ParentOnCompletion(
- parentJob: Job,
- @JvmField val subordinateJob: JobSupport
-) : JobNode<Job>(parentJob) {
- override fun invoke(reason: Throwable?) { subordinateJob.onParentCompletion(reason) }
- override fun toString(): String = "ParentOnCompletion[$subordinateJob]"
-}
-
private class CancelFutureOnCompletion(
job: Job,
- @JvmField val future: Future<*>
+ private val future: Future<*>
) : JobNode<Job>(job) {
override fun invoke(reason: Throwable?) {
// Don't interrupt when cancelling future on completion, because no one is going to reset this
@@ -840,7 +847,7 @@
) : JobNode<JobSupport>(job) {
override fun invoke(reason: Throwable?) {
if (select.trySelect(idempotent = null))
- block.startCoroutine(select.completion)
+ block.startCoroutineCancellable(select.completion)
}
override fun toString(): String = "SelectJoinOnCompletion[$select]"
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
index fb0396e..e9bdfd0 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
@@ -170,7 +170,7 @@
}
}
- private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutine(true) sc@ { cont ->
+ private suspend fun sendSuspend(element: E): Unit = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
val send = SendElement(element, cont)
loop@ while (true) {
if (enqueueSend(send)) {
@@ -425,7 +425,7 @@
}
@Suppress("UNCHECKED_CAST")
- private suspend fun receiveSuspend(): E = suspendCancellableCoroutine(true) sc@ { cont ->
+ private suspend fun receiveSuspend(): E = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
val receive = ReceiveElement(cont as CancellableContinuation<E?>, nullOnClose = false)
while (true) {
if (enqueueReceive(receive)) {
@@ -473,7 +473,7 @@
}
@Suppress("UNCHECKED_CAST")
- private suspend fun receiveOrNullSuspend(): E? = suspendCancellableCoroutine(true) sc@ { cont ->
+ private suspend fun receiveOrNullSuspend(): E? = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
val receive = ReceiveElement(cont, nullOnClose = true)
while (true) {
if (enqueueReceive(receive)) {
@@ -662,7 +662,7 @@
return true
}
- private suspend fun hasNextSuspend(): Boolean = suspendCancellableCoroutine(true) sc@ { cont ->
+ private suspend fun hasNextSuspend(): Boolean = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
val receive = ReceiveHasNext(this, cont)
while (true) {
if (channel.enqueueReceive(receive)) {
@@ -772,8 +772,11 @@
if (select.trySelect(idempotent = null)) {
if (closed.closeCause == null && nullOnClose) {
block.startCoroutine(null, select.completion)
- } else
- select.resumeSelectWithException(closed.receiveException, MODE_DISPATCHED)
+ } else {
+ // we are dispatching coroutine to process channel close on receive, which is an atomically
+ // cancellable suspending function, so use an atomic (non-cancellable) resume mode
+ select.resumeSelectWithException(closed.receiveException, MODE_ATOMIC_DEFAULT)
+ }
}
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Actor.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Actor.kt
index 1b95716..bfca108 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Actor.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Actor.kt
@@ -19,7 +19,6 @@
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.selects.SelectInstance
import kotlin.coroutines.experimental.CoroutineContext
-import kotlin.coroutines.experimental.startCoroutine
/**
* Scope for [actor] coroutine builder.
@@ -109,7 +108,7 @@
override val channel: Channel<E> get() = this
override fun onStart() {
- block.startCoroutine(this, this)
+ block.startCoroutineCancellable(this, this)
}
suspend override fun send(element: E) {
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
index 5edf0ee..97b832d 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
@@ -54,8 +54,12 @@
*
* This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
* function is suspended, this function immediately resumes with [CancellationException].
- * Cancellation of suspended send is *atomic* -- when this function
+ *
+ * *Cancellation of suspended send is atomic* -- when this function
* throws [CancellationException] it means that the [element] was not sent to this channel.
+ * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
+ * continue to execute even after it was cancelled from the same thread in the case when this send operation
+ * was already resumed and the continuation was posted for execution to the thread's queue.
*
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
@@ -121,8 +125,12 @@
*
* This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
* function is suspended, this function immediately resumes with [CancellationException].
- * Cancellation of suspended receive is *atomic* -- when this function
+ *
+ * *Cancellation of suspended receive is atomic* -- when this function
* throws [CancellationException] it means that the element was not retrieved from this channel.
+ * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
+ * continue to execute even after it was cancelled from the same thread in the case when this receive operation
+ * was already resumed and the continuation was posted for execution to the thread's queue.
*
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
@@ -139,8 +147,12 @@
*
* This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
* function is suspended, this function immediately resumes with [CancellationException].
- * Cancellation of suspended receive is *atomic* -- when this function
+ *
+ * *Cancellation of suspended receive is atomic* -- when this function
* throws [CancellationException] it means that the element was not retrieved from this channel.
+ * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
+ * continue to execute even after it was cancelled from the same thread in the case when this receive operation
+ * was already resumed and the continuation was posted for execution to the thread's queue.
*
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
@@ -193,8 +205,12 @@
*
* This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
* function is suspended, this function immediately resumes with [CancellationException].
- * Cancellation of suspended receive is *atomic* -- when this function
+ *
+ * *Cancellation of suspended receive is atomic* -- when this function
* throws [CancellationException] it means that the element was not retrieved from this channel.
+ * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
+ * continue to execute even after it was cancelled from the same thread in the case when this receive operation
+ * was already resumed and the continuation was posted for execution to the thread's queue.
*
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
@@ -209,8 +225,12 @@
*
* This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
* function is suspended, this function immediately resumes with [CancellationException].
- * Cancellation of suspended receive is *atomic* -- when this function
+ *
+ * *Cancellation of suspended receive is atomic* -- when this function
* throws [CancellationException] it means that the element was not retrieved from this channel.
+ * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
+ * continue to execute even after it was cancelled from the same thread in the case when this receive operation
+ * was already resumed and the continuation was posted for execution to the thread's queue.
*
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/intrinsics/Undispatched.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/intrinsics/Undispatched.kt
index e5b18c0..f6cf3b9 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/intrinsics/Undispatched.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/intrinsics/Undispatched.kt
@@ -16,8 +16,11 @@
package kotlinx.coroutines.experimental.intrinsics
+import kotlinx.coroutines.experimental.resumeCancellable
import kotlin.coroutines.experimental.Continuation
-import kotlin.coroutines.experimental.intrinsics.*
+import kotlin.coroutines.experimental.createCoroutine
+import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
+import kotlin.coroutines.experimental.intrinsics.startCoroutineUninterceptedOrReturn
import kotlin.coroutines.experimental.suspendCoroutine
/**
@@ -26,7 +29,7 @@
* @suppress **This is unstable API and it is subject to change.**
*/
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN", "UNCHECKED_CAST")
-internal fun <R> (suspend () -> R).startCoroutineUndispatched(completion: Continuation<R>) {
+internal fun <T> (suspend () -> T).startCoroutineUndispatched(completion: Continuation<T>) {
val value = try {
startCoroutineUninterceptedOrReturn(completion)
} catch (e: Throwable) {
@@ -34,7 +37,7 @@
return
}
if (value !== COROUTINE_SUSPENDED)
- completion.resume(value as R)
+ completion.resume(value as T)
}
/**
@@ -43,13 +46,13 @@
* @suppress **This is unstable API and it is subject to change.**
*/
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN", "UNCHECKED_CAST")
-internal fun <E, R> (suspend (E) -> R).startCoroutineUndispatched(element: E, completion: Continuation<R>) {
+internal fun <R, T> (suspend (R) -> T).startCoroutineUndispatched(receiver: R, completion: Continuation<T>) {
val value = try {
- startCoroutineUninterceptedOrReturn(element, completion)
+ startCoroutineUninterceptedOrReturn(receiver, completion)
} catch (e: Throwable) {
completion.resumeWithException(e)
return
}
if (value !== COROUTINE_SUSPENDED)
- completion.resume(value as R)
+ completion.resume(value as T)
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
index fc75e17..b738101 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
@@ -160,8 +160,14 @@
*
* This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
* function is suspended, this function immediately resumes with [CancellationException].
- * Cancellation of suspended select is *atomic* -- when this function
- * throws [CancellationException] it means that no clause was selected.
+ *
+ * Atomicity of cancellation depends on the clause: [onSend][SelectBuilder.onSend], [onReceive][SelectBuilder.onReceive],
+ * [onReceiveOrNull][SelectBuilder.onReceiveOrNull], and [onLock][SelectBuilder.onLock] clauses are
+ * *atomically cancellable*. When select throws [CancellationException] it means that those clauses had not performed
+ * their respective operations.
+ * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
+ * continue to execute even after it was cancelled from the same thread in the case when this select operation
+ * was already resumed on atomically cancellable clause and the continuation was posted for execution to the thread's queue.
*
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
@@ -185,7 +191,9 @@
@PublishedApi
internal class SelectBuilderImpl<in R>(
delegate: Continuation<R>
-) : CancellableContinuationImpl<R>(delegate, active = false), SelectBuilder<R>, SelectInstance<R> {
+) : CancellableContinuationImpl<R>(delegate, defaultResumeMode = MODE_DIRECT, active = false),
+ SelectBuilder<R>, SelectInstance<R>
+{
@PublishedApi
internal fun handleBuilderException(e: Throwable) {
if (trySelect(idempotent = null)) {
@@ -213,11 +221,9 @@
points.
*/
if (trySelect(null))
- cancel(cause)
+ super.onParentCompletion(cause)
}
- override val defaultResumeMode get() = MODE_DIRECT // all resumes through completion are dispatched directly
-
override val completion: Continuation<R> get() {
check(isSelected) { "Must be selected first" }
return this
@@ -263,7 +269,7 @@
// todo: we could have replaced startCoroutine with startCoroutineUndispatched
// But we need a way to know that Delay.invokeOnTimeout had used the right thread
if (trySelect(idempotent = null))
- block.startCoroutine(completion)
+ block.startCoroutineCancellable(completion) // shall be cancellable while waits for dispatch
}
val delay = context[ContinuationInterceptor] as? Delay
if (delay != null)
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/sync/Mutex.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/sync/Mutex.kt
index 1a935f0..49ddfa6 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/sync/Mutex.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/sync/Mutex.kt
@@ -62,8 +62,12 @@
*
* This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
* function is suspended, this function immediately resumes with [CancellationException].
- * Cancellation of suspended lock invocation is *atomic* -- when this function
+ *
+ * *Cancellation of suspended lock invocation is atomic* -- when this function
* throws [CancellationException] it means that the mutex was not locked.
+ * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
+ * continue to execute even after it was cancelled from the same thread in the case when this lock operation
+ * was already resumed and the continuation was posted for execution to the thread's queue.
*
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
@@ -198,7 +202,7 @@
return lockSuspend(owner)
}
- private suspend fun lockSuspend(owner: Any?) = suspendCancellableCoroutine<Unit>(holdCancellability = true) sc@ { cont ->
+ private suspend fun lockSuspend(owner: Any?) = suspendAtomicCancellableCoroutine<Unit>(holdCancellability = true) sc@ { cont ->
val waiter = LockCont(owner, cont)
while (true) { // lock-free loop on state
val state = this._state
@@ -220,7 +224,7 @@
check(curOwner !== owner) { "Already locked by $owner" }
if (state.addLastIf(waiter, { this._state === state })) {
// added to waiter list!
- cont.initCancellability()
+ cont.initCancellability() // make it properly cancellable
cont.removeOnCancel(waiter)
return@sc
}