blob: 36638781c27b2a8dfda986a9872808dea44d0aaa [file] [log] [blame]
/*
* Copyright 2016-2017 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kotlinx.coroutines.experimental
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.CoroutineContext
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
import kotlin.coroutines.experimental.suspendCoroutine
// --------------- cancellable continuations ---------------
/**
* Cancellable continuation. Its job is _completed_ when it is resumed or cancelled.
* When [cancel] function is explicitly invoked, this continuation resumes with [CancellationException] or
* with the specified cancel cause.
*
* Cancellable continuation has three states:
*
* | **State** | [isActive] | [isCompleted] | [isCancelled] |
* | ----------------------------------- | ---------- | ------------- | ------------- |
* | _Active_ (initial state) | `true` | `false` | `false` |
* | _Resumed_ (final _completed_ state) | `false` | `true` | `false` |
* | _Canceled_ (final _completed_ state)| `false` | `true` | `true` |
*
* Invocation of [cancel] transitions this continuation from _active_ to _cancelled_ state, while
* invocation of [resume] or [resumeWithException] transitions it from _active_ to _resumed_ state.
*
* Invocation of [resume] or [resumeWithException] in _resumed_ state produces [IllegalStateException]
* but is ignored in _cancelled_ state.
*/
public interface CancellableContinuation<in T> : Continuation<T>, Job {
/**
* Returns `true` if this continuation was [cancelled][cancel].
*
* It implies that [isActive] is `false` and [isCompleted] is `true`.
*/
public val isCancelled: Boolean
/**
* Tries to resume this continuation with a given value and returns non-null object token if it was successful,
* or `null` otherwise (it was already resumed or cancelled). When non-null object was returned,
* [completeResume] must be invoked with it.
*
* When [idempotent] is not `null`, this function performs _idempotent_ operation, so that
* further invocations with the same non-null reference produce the same result.
*
* @suppress **This is unstable API and it is subject to change.**
*/
public fun tryResume(value: T, idempotent: Any? = null): Any?
/**
* Tries to resume this continuation with a given exception and returns non-null object token if it was successful,
* or `null` otherwise (it was already resumed or cancelled). When non-null object was returned,
* [completeResume] must be invoked with it.
*
* @suppress **This is unstable API and it is subject to change.**
*/
public fun tryResumeWithException(exception: Throwable): Any?
/**
* Completes the execution of [tryResume] or [tryResumeWithException] on its non-null result.
*
* @suppress **This is unstable API and it is subject to change.**
*/
public fun completeResume(token: Any)
/**
* Makes this continuation cancellable. Use it with `holdCancellability` optional parameter to
* [suspendCancellableCoroutine] function. It throws [IllegalStateException] if invoked more than once.
*/
public fun initCancellability()
/**
* Resumes this continuation with a given [value] in the invoker thread without going though
* [dispatch][CoroutineDispatcher.dispatch] function of the [CoroutineDispatcher] in the [context].
* This function is designed to be used only by the [CoroutineDispatcher] implementations themselves.
* **It should not be used in general code**.
*
* The receiver [CoroutineDispatcher] of this function be equal to the context dispatcher or
* [IllegalArgumentException] if thrown.
*/
public fun CoroutineDispatcher.resumeUndispatched(value: T)
/**
* Resumes this continuation with a given [exception] in the invoker thread without going though
* [dispatch][CoroutineDispatcher.dispatch] function of the [CoroutineDispatcher] in the [context].
* This function is designed to be used only by the [CoroutineDispatcher] implementations themselves.
* **It should not be used in general code**.
*
* The receiver [CoroutineDispatcher] of this function be equal to the context dispatcher or
* [IllegalArgumentException] if thrown.
*/
public fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable)
}
/**
* Suspend coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to
* the [block]. This function throws [CancellationException] if the coroutine is cancelled while suspended.
*
* If [holdCancellability] optional parameter is `true`, then the coroutine is suspended, but it is not
* cancellable until [CancellableContinuation.initCancellability] is invoked.
*/
public inline suspend fun <T> suspendCancellableCoroutine(
holdCancellability: Boolean = false,
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendCoroutineOrReturn { cont ->
val cancellable = CancellableContinuationImpl(cont, active = true)
if (!holdCancellability) cancellable.initCancellability()
block(cancellable)
cancellable.getResult()
}
/**
* Removes a given node on cancellation.
* @suppress **This is unstable API and it is subject to change.**
*/
public fun CancellableContinuation<*>.removeOnCancel(node: LockFreeLinkedListNode): Job.Registration =
invokeOnCompletion(RemoveOnCancel(this, node))
// --------------- implementation details ---------------
private class RemoveOnCancel(
cont: CancellableContinuation<*>,
val node: LockFreeLinkedListNode
) : JobNode<CancellableContinuation<*>>(cont) {
override fun invoke(reason: Throwable?) {
if (job.isCancelled)
node.remove()
}
override fun toString() = "RemoveOnCancel[$node]"
}
@PublishedApi
internal open class CancellableContinuationImpl<in T>(
@JvmField
protected val delegate: Continuation<T>,
active: Boolean
) : AbstractCoroutine<T>(active), CancellableContinuation<T> {
@Volatile
private var decision = UNDECIDED
override val parentContext: CoroutineContext
get() = delegate.context
protected companion object {
@JvmStatic
val DECISION: AtomicIntegerFieldUpdater<CancellableContinuationImpl<*>> =
AtomicIntegerFieldUpdater.newUpdater(CancellableContinuationImpl::class.java, "decision")
const val UNDECIDED = 0
const val SUSPENDED = 1
const val RESUMED = 2
const val MODE_UNDISPATCHED = 1
const val MODE_DIRECT = 2
@Suppress("UNCHECKED_CAST")
fun <T> getSuccessfulResult(state: Any?): T = if (state is CompletedIdempotentResult) state.result as T else state as T
}
override fun initCancellability() {
initParentJob(delegate.context[Job])
}
@PublishedApi
internal fun getResult(): Any? {
val decision = this.decision // volatile read
if (decision == UNDECIDED && DECISION.compareAndSet(this, UNDECIDED, SUSPENDED)) return COROUTINE_SUSPENDED
// otherwise, afterCompletion was already invoked, and the result is in the state
val state = this.state
if (state is CompletedExceptionally) throw state.exception
return getSuccessfulResult(state)
}
override val isCancelled: Boolean get() = state is Cancelled
override fun tryResume(value: T, idempotent: Any?): Any? {
while (true) { // lock-free loop on state
val state = this.state // atomic read
when (state) {
is Incomplete -> {
val idempotentStart = state.idempotentStart
val update: Any? = if (idempotent == null && idempotentStart == null) value else
CompletedIdempotentResult(idempotentStart, idempotent, value, state)
if (tryUpdateState(state, update)) return state
}
is CompletedIdempotentResult -> {
if (state.idempotentResume === idempotent) {
check(state.result === value) { "Non-idempotent resume" }
return state.token
} else
return null
}
else -> return null // cannot resume -- not active anymore
}
}
}
override fun tryResumeWithException(exception: Throwable): Any? {
while (true) { // lock-free loop on state
val state = this.state // atomic read
when (state) {
is Incomplete -> {
if (tryUpdateState(state, CompletedExceptionally(state.idempotentStart, exception))) return state
}
else -> return null // cannot resume -- not active anymore
}
}
}
override fun completeResume(token: Any) {
completeUpdateState(token, state, mode = 0)
}
override fun afterCompletion(state: Any?, mode: Int) {
val decision = this.decision // volatile read
if (decision == UNDECIDED && DECISION.compareAndSet(this, UNDECIDED, RESUMED)) return // will get result in getResult
// otherwise, getResult has already commenced, i.e. it was resumed later or in other thread
if (state is CompletedExceptionally) {
val exception = state.exception
when (mode) {
0 -> delegate.resumeWithException(exception)
MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatchedWithException(exception)
MODE_DIRECT -> {
if (delegate is DispatchedContinuation)
delegate.continuation.resumeWithException(exception)
else
delegate.resumeWithException(exception)
}
else -> error("Invalid mode $mode")
}
} else {
val value = getSuccessfulResult<T>(state)
when (mode) {
0 -> delegate.resume(value)
MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatched(value)
MODE_DIRECT -> {
if (delegate is DispatchedContinuation)
delegate.continuation.resume(value)
else
delegate.resume(value)
}
else -> error("Invalid mode $mode")
}
}
}
override fun CoroutineDispatcher.resumeUndispatched(value: T) {
val dc = delegate as? DispatchedContinuation ?: throw IllegalArgumentException("Must be used with DispatchedContinuation")
check(dc.dispatcher === this) { "Must be invoked from the context CoroutineDispatcher"}
resume(value, MODE_UNDISPATCHED)
}
override fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable) {
val dc = delegate as? DispatchedContinuation ?: throw IllegalArgumentException("Must be used with DispatchedContinuation")
check(dc.dispatcher === this) { "Must be invoked from the context CoroutineDispatcher"}
resumeWithException(exception, MODE_UNDISPATCHED)
}
private class CompletedIdempotentResult(
idempotentStart: Any?,
@JvmField val idempotentResume: Any?,
@JvmField val result: Any?,
@JvmField val token: Incomplete
) : CompletedIdempotentStart(idempotentStart) {
override fun toString(): String = "CompletedIdempotentResult[$result]"
}
}