blob: 45352c4f776248e3164d1203f583dce2a40addb1 [file] [log] [blame]
/*
* Copyright 2016-2017 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kotlinx.coroutines.experimental
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
import kotlinx.coroutines.experimental.internal.OpDescriptor
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
import kotlinx.coroutines.experimental.selects.SelectBuilder
import kotlinx.coroutines.experimental.selects.SelectInstance
import kotlinx.coroutines.experimental.selects.select
import java.util.concurrent.Future
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
import kotlin.coroutines.experimental.AbstractCoroutineContextElement
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.CoroutineContext
// --------------- core job interfaces ---------------
/**
* A background job. Conceptually, a job is a cancellable thing with a simple life-cycle that
* culminates in its completion. Jobs can be arranged into parent-child hierarchies where cancellation
* or completion of parent immediately cancels all its children.
*
* The most basic instances of [Job] are created with [launch] coroutine builder or with a
* `Job()` factory function. Other coroutine builders and primitives like
* [Deferred] also implement [Job] interface.
*
* A job has the following states:
*
* | **State** | [isActive] | [isCompleted] | [isCancelled] |
* | --------------------------------------- | ---------- | ------------- | ------------- |
* | _New_ (optional initial state) | `false` | `false` | `false` |
* | _Active_ (default initial state) | `true` | `false` | `false` |
* | _Cancelling_ (optional transient state) | `false` | `false` | `true` |
* | _Cancelled_ (final state) | `false` | `true` | `true` |
* | _Completed normally_ (final state) | `false` | `true` | `false` |
*
* Usually, a job is created in _active_ state (it is created and started). However, coroutine builders
* that provide an optional `start` parameter create a coroutine in _new_ state when this parameter is set to
* [CoroutineStart.LAZY]. Such a job can be made _active_ by invoking [start] or [join].
*
* A job can be _cancelled_ at any time with [cancel] function that forces it to transition to
* _cancelling_ state immediately. Simple jobs, that are not backed by a coroutine, like
* [CompletableDeferred] and the result of `Job()` factory function, don't
* have a _cancelling_ state, but become _cancelled_ on [cancel] immediately.
* Coroutines, on the other hand, become _cancelled_ only when they finish executing their code.
*
* ```
* +-----+ start +--------+ complete +-----------+
* | New | ---------------> | Active | -----------> | Completed |
* +-----+ +--------+ | normally |
* | | +-----------+
* | cancel | cancel
* V V
* +-----------+ finish +------------+
* | Cancelled | <--------- | Cancelling |
* |(completed)| +------------+
* +-----------+
* ```
*
* A job in the coroutine [context][CoroutineScope.context] represents the coroutine itself.
* A job is active while the coroutine is working and job's cancellation aborts the coroutine when
* the coroutine is suspended on a _cancellable_ suspension point by throwing [CancellationException]
* or the cancellation cause inside the coroutine.
*
* A job can have a _parent_ job. A job with a parent is cancelled when its parent is cancelled or completes.
*
* All functions on this interface and on all interfaces derived from it are **thread-safe** and can
* be safely invoked from concurrent coroutines without external synchronization.
*/
public interface Job : CoroutineContext.Element {
/**
* Key for [Job] instance in the coroutine context.
*/
public companion object Key : CoroutineContext.Key<Job> {
/**
* Creates a new job object in _active_ state.
* It is optionally a child of a [parent] job.
* @suppress **Deprecated**
*/
@Deprecated("Replaced with top-level function", level = DeprecationLevel.HIDDEN)
public operator fun invoke(parent: Job? = null): Job = Job(parent)
init {
/*
* Here we make sure that CoroutineExceptionHandler is always initialized in advance, so
* that if a coroutine fails due to StackOverflowError we don't fail to report this error
* trying to initialize CoroutineExceptionHandler
*/
CoroutineExceptionHandler
}
}
// ------------ state query ------------
/**
* Returns `true` when this job is active -- it was already started and has not completed or cancelled yet.
*/
public val isActive: Boolean
/**
* Returns `true` when this job has completed for any reason. A job that was cancelled and has
* finished its execution is also considered complete.
*/
public val isCompleted: Boolean
/**
* Returns `true` if this job was [cancelled][cancel]. In the general case, it does not imply that the
* job has already [completed][isCompleted] (it may still be cancelling whatever it was doing).
*/
public val isCancelled: Boolean
/**
* Returns the exception that signals the completion of this job -- it returns the original
* [cancel] cause or an instance of [CancellationException] if this job had completed
* normally or was cancelled without a cause. This function throws
* [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor
* [isCancelled] yet.
*
* The [cancellable][suspendCancellableCoroutine] suspending functions throw this exception
* when trying to suspend in the context of this job.
*/
public fun getCompletionException(): Throwable
// ------------ state update ------------
/**
* Starts coroutine related to this job (if any) if it was not started yet.
* The result `true` if this invocation actually started coroutine or `false`
* if it was already started or completed.
*/
public fun start(): Boolean
/**
* Cancel this job with an optional cancellation [cause]. The result is `true` if this job was
* cancelled as a result of this invocation and `false` otherwise
* (if it was already _completed_ or if it is [NonCancellable]).
* Repeated invocations of this function have no effect and always produce `false`.
*
* When cancellation has a clear reason in the code, an instance of [CancellationException] should be created
* at the corresponding original cancellation site and passed into this method to aid in debugging by providing
* both the context of cancellation and text description of the reason.
*/
public fun cancel(cause: Throwable? = null): Boolean
// ------------ state waiting ------------
/**
* Suspends coroutine until this job is complete. This invocation resumes normally (without exception)
* when the job is complete for any reason. This function also [starts][Job.start] the corresponding coroutine
* if the [Job] was still in _new_ state.
*
* This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this
* suspending function is suspended, this function immediately resumes with [CancellationException].
*
* This function can be used in [select] invocation with [onJoin][SelectBuilder.onJoin] clause.
* Use [isCompleted] to check for completion of this job without waiting.
*/
public suspend fun join()
// ------------ low-level state-notification ------------
/**
* Registers handler that is **synchronously** invoked once on completion of this job.
* When job is already complete, then the handler is immediately invoked
* with a job's cancellation cause or `null`. Otherwise, handler will be invoked once when this
* job is complete.
*
* The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] the
* registration of this handler and release its memory if its invocation is no longer needed.
* There is no need to dispose the handler after completion of this job. The references to
* all the handlers are released when this job completes.
*
* Note, that the handler is not invoked on invocation of [cancel] when
* job becomes _cancelling_, but only when the corresponding coroutine had finished execution
* of its code and became _cancelled_. There is an overloaded version of this function
* with `onCancelling` parameter to receive notification on _cancelling_ state.
*
* **Note**: This function is a part of internal machinery that supports parent-child hierarchies
* and allows for implementation of suspending functions that wait on the Job's state.
* This function should not be used in general application code.
* Implementations of `CompletionHandler` must be fast and _lock-free_.
*/
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
/**
* Registers handler that is **synchronously** invoked once on cancellation or completion of this job.
* When job is already complete, then the handler is immediately invoked
* with a job's cancellation cause or `null`. Otherwise, handler will be invoked once when this
* job is cancelled or complete.
*
* Invocation of this handler on a transition to a transient _cancelling_ state
* is controlled by [onCancelling] boolean parameter.
* The handler is invoked on invocation of [cancel] when
* job becomes _cancelling_ when [onCancelling] parameters is set to `true`. However,
* when this [Job] is not backed by a coroutine, like [CompletableDeferred] or [CancellableContinuation]
* (both of which do not posses a _cancelling_ state), then the value of [onCancelling] parameter is ignored.
*
* The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] the
* registration of this handler and release its memory if its invocation is no longer needed.
* There is no need to dispose the handler after completion of this job. The references to
* all the handlers are released when this job completes.
*
* **Note**: This function is a part of internal machinery that supports parent-child hierarchies
* and allows for implementation of suspending functions that wait on the Job's state.
* This function should not be used in general application code.
* Implementations of `CompletionHandler` must be fast and _lock-free_.
*/
public fun invokeOnCompletion(handler: CompletionHandler, onCancelling: Boolean): DisposableHandle
// ------------ unstable internal API ------------
/**
* Registers [onJoin][SelectBuilder.onJoin] select clause.
* @suppress **This is unstable API and it is subject to change.**
*/
public fun <R> registerSelectJoin(select: SelectInstance<R>, block: suspend () -> R)
/**
* @suppress **Error**: Operator '+' on two Job objects is meaningless.
* Job is a coroutine context element and `+` is a set-sum operator for coroutine contexts.
* The job to the right of `+` just replaces the job the left of `+`.
*/
@Suppress("DeprecatedCallableAddReplaceWith")
@Deprecated(message = "Operator '+' on two Job objects is meaningless. " +
"Job is a coroutine context element and `+` is a set-sum operator for coroutine contexts. " +
"The job to the right of `+` just replaces the job the left of `+`.",
level = DeprecationLevel.ERROR)
public operator fun plus(other: Job) = other
/**
* Registration object for [invokeOnCompletion]. It can be used to [unregister] if needed.
* There is no need to unregister after completion.
* @suppress **Deprecated**: Replace with `DisposableHandle`
*/
@Deprecated(message = "Replace with `DisposableHandle`",
replaceWith = ReplaceWith("DisposableHandle"))
public interface Registration {
/**
* Unregisters completion handler.
* @suppress **Deprecated**: Replace with `dispose`
*/
@Deprecated(message = "Replace with `dispose`",
replaceWith = ReplaceWith("dispose()"))
public fun unregister()
}
}
/**
* Creates a new job object in an _active_ state.
* It is optionally a child of a [parent] job.
*/
public fun Job(parent: Job? = null): Job = JobImpl(parent)
/**
* A handle to an allocated object that can be disposed to make it eligible for garbage collection.
*/
@Suppress("DEPRECATION") // todo: remove when Job.Registration is removed
public interface DisposableHandle : Job.Registration {
/**
* Disposes the corresponding object, making it eligible for garbage collection.
* Repeated invocation of this function has no effect.
*/
public fun dispose()
/**
* Unregisters completion handler.
* @suppress **Deprecated**: Replace with `dispose`
*/
@Deprecated(message = "Replace with `dispose`",
replaceWith = ReplaceWith("dispose()"))
public override fun unregister() = dispose()
}
/**
* Handler for [Job.invokeOnCompletion].
*
* **Note**: This type is a part of internal machinery that supports parent-child hierarchies
* and allows for implementation of suspending functions that wait on the Job's state.
* This type should not be used in general application code.
* Implementations of `CompletionHandler` must be fast and _lock-free_.
*/
public typealias CompletionHandler = (Throwable?) -> Unit
/**
* Thrown by cancellable suspending functions if the [Job] of the coroutine is cancelled while it is suspending.
*/
public typealias CancellationException = java.util.concurrent.CancellationException
/**
* Unregisters a specified [registration] when this job is complete.
*
* This is a shortcut for the following code with slightly more efficient implementation (one fewer object created).
* ```
* invokeOnCompletion { registration.unregister() }
* ```
* @suppress: **Deprecated**: Renamed to `disposeOnCompletion`.
*/
@Deprecated(message = "Renamed to `disposeOnCompletion`",
replaceWith = ReplaceWith("disposeOnCompletion(registration)"))
public fun Job.unregisterOnCompletion(registration: DisposableHandle): DisposableHandle =
invokeOnCompletion(DisposeOnCompletion(this, registration))
/**
* Disposes a specified [handle] when this job is complete.
*
* This is a shortcut for the following code with slightly more efficient implementation (one fewer object created).
* ```
* invokeOnCompletion { handle.dispose() }
* ```
*/
public fun Job.disposeOnCompletion(handle: DisposableHandle): DisposableHandle =
invokeOnCompletion(DisposeOnCompletion(this, handle))
/**
* Cancels a specified [future] when this job is complete.
*
* This is a shortcut for the following code with slightly more efficient implementation (one fewer object created).
* ```
* invokeOnCompletion { future.cancel(false) }
* ```
*/
public fun Job.cancelFutureOnCompletion(future: Future<*>): DisposableHandle =
invokeOnCompletion(CancelFutureOnCompletion(this, future))
/**
* @suppress **Deprecated**: `join` is now a member function of `Job`.
*/
@Suppress("EXTENSION_SHADOWED_BY_MEMBER", "DeprecatedCallableAddReplaceWith")
@Deprecated(message = "`join` is now a member function of `Job`")
public suspend fun Job.join() = this.join()
/**
* No-op implementation of [Job.Registration].
*/
@Deprecated(message = "Replace with `NonDisposableHandle`",
replaceWith = ReplaceWith("NonDisposableHandle"))
typealias EmptyRegistration = NonDisposableHandle
/**
* No-op implementation of [DisposableHandle].
*/
public object NonDisposableHandle : DisposableHandle {
/** Does not do anything. */
override fun dispose() {}
/** Returns "NonDisposableHandle" string. */
override fun toString(): String = "NonDisposableHandle"
}
// --------------- utility classes to simplify job implementation
/**
* A concrete implementation of [Job]. It is optionally a child to a parent job.
* This job is cancelled when the parent is complete, but not vise-versa.
*
* This is an open class designed for extension by more specific classes that might augment the
* state and mare store addition state information for completed jobs, like their result values.
*
* @param active when `true` the job is created in _active_ state, when `false` in _new_ state. See [Job] for details.
* @suppress **This is unstable API and it is subject to change.**
*/
public open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(Job), Job {
/*
=== Internal states ===
name state class public state description
------ ------------ ------------ -----------
EMPTY_N EmptyNew : New no listeners
EMPTY_A EmptyActive : Active no listeners
SINGLE JobNode : Active a single listener
SINGLE+ JobNode : Active a single listener + NodeList added as its next
LIST_N NodeList : New a list of listeners (promoted once, does not got back to EmptyNew)
LIST_A NodeList : Active a list of listeners (promoted once, does not got back to JobNode/EmptyActive)
CANCELLING Cancelling : Cancelling(*) a list of listeners (promoted once)
FINAL_C Cancelled : Cancelled cancelled (final state)
FINAL_F Failed : Completed failed for other reason (final state)
FINAL_R <any> : Completed produced some result
=== Transitions ===
New states Active states Inactive states
+---------+ +---------+ +----------+
| EMPTY_N | --+-> | EMPTY_A | --+-> | FINAL_* |
+---------+ | +---------+ | +----------+
| | | ^ |
| | V | |
| | +---------+ |
| | | SINGLE | --+
| | +---------+ |
| | | |
| | V |
| | +---------+ |
| +-- | SINGLE+ | --+
| +---------+ |
| | |
V V |
+---------+ +---------+ |
| LIST_N | ----> | LIST_A | --+
+---------+ +---------+ |
| | |
| V |
| +------------+ |
+-------> | CANCELLING | --+
+------------+
This state machine and its transition matrix are optimized for the common case when job is created in active
state (EMPTY_A) and at most one completion listener is added to it during its life-time.
Note, that the actual `_state` variable can also be a reference to atomic operation descriptor `OpDescriptor`
(*) The CANCELLING state is used only in AbstractCoroutine class. A general Job (that does not
extend AbstractCoroutine) does not have CANCELLING state. It immediately transitions to
FINAL_C (Cancelled) state on cancellation/completion
*/
@Volatile
private var _state: Any? = if (active) EmptyActive else EmptyNew // shared objects while we have no listeners
@Volatile
private var parentHandle: DisposableHandle? = null
protected companion object {
private val STATE: AtomicReferenceFieldUpdater<JobSupport, Any?> =
AtomicReferenceFieldUpdater.newUpdater(JobSupport::class.java, Any::class.java, "_state")
fun stateToString(state: Any?): String =
if (state is Incomplete)
if (state.isActive) "Active" else "New"
else "Completed"
}
// ------------ initialization ------------
/**
* Initializes parent job.
* It shall be invoked at most once after construction after all other initialization.
*/
public fun initParentJob(parent: Job?) {
check(parentHandle == null)
if (parent == null) {
parentHandle = NonDisposableHandle
return
}
parent.start() // make sure the parent is started
// directly pass HandlerNode to parent scope to optimize one closure object (see makeNode)
val newRegistration = parent.invokeOnCompletion(ParentOnCancellation(parent), onCancelling = true)
parentHandle = newRegistration
// now check our state _after_ registering (see updateState order of actions)
if (isCompleted) newRegistration.dispose()
}
private inner class ParentOnCancellation(parent: Job) : JobCancellationNode<Job>(parent) {
override fun invokeOnce(reason: Throwable?) { onParentCancellation(reason) }
override fun toString(): String = "ParentOnCancellation[${this@JobSupport}]"
}
/**
* Invoked at most once on parent completion.
* @suppress **This is unstable API and it is subject to change.**
*/
protected open fun onParentCancellation(cause: Throwable?) {
// if parent was completed with CancellationException then use it as the cause of our cancellation, too.
// however, we shall not use application specific exceptions here. So if parent crashes due to IOException,
// we cannot and should not cancel the child with IOException
cancel(cause as? CancellationException)
}
// ------------ state query ------------
/**
* Returns current state of this job.
*/
protected val state: Any? get() {
while (true) { // helper loop on state (complete in-progress atomic operations)
val state = _state
if (state !is OpDescriptor) return state
state.perform(this)
}
}
protected inline fun lockFreeLoopOnState(block: (Any?) -> Unit): Nothing {
while (true) {
block(state)
}
}
public final override val isActive: Boolean get() {
val state = this.state
return state is Incomplete && state.isActive
}
public final override val isCompleted: Boolean get() = state !is Incomplete
public final override val isCancelled: Boolean get() {
val state = this.state
return state is Cancelled || state is Cancelling
}
// ------------ state update ------------
/**
* Updates current [state] of this job.
*/
protected fun updateState(expect: Any, proposedUpdate: Any?, mode: Int): Boolean {
val update = coerceProposedUpdate(expect, proposedUpdate)
if (!tryUpdateState(expect, update)) return false
completeUpdateState(expect, update, mode)
// if an exceptional completion was suppressed (because cancellation was in progress), then report it separately
if (proposedUpdate !== update && proposedUpdate is CompletedExceptionally && proposedUpdate.cause != null)
handleException(proposedUpdate.cause)
return true
}
// when Job is in Cancelling state, it can only be promoted to Cancelled state with the same cause
// however, null cause can be replaced with more specific CancellationException (that contains stack trace)
private fun coerceProposedUpdate(expect: Any, proposedUpdate: Any?): Any? =
if (expect is Cancelling && !correspondinglyCancelled(expect, proposedUpdate))
expect.cancelled else proposedUpdate
private fun correspondinglyCancelled(expect: Cancelling, proposedUpdate: Any?): Boolean {
if (proposedUpdate !is Cancelled) return false
return proposedUpdate.cause === expect.cancelled.cause ||
proposedUpdate.cause is CancellationException && expect.cancelled.cause == null
}
/**
* Tries to initiate update of the current [state] of this job.
*/
protected fun tryUpdateState(expect: Any, update: Any?): Boolean {
require(expect is Incomplete && update !is Incomplete) // only incomplete -> completed transition is allowed
if (!STATE.compareAndSet(this, expect, update)) return false
// Unregister from parent job
parentHandle?.dispose() // volatile read parentHandle _after_ state was updated
return true // continues in completeUpdateState
}
/**
* Completes update of the current [state] of this job.
*/
protected fun completeUpdateState(expect: Any, update: Any?, mode: Int) {
// Invoke completion handlers
val cause = (update as? CompletedExceptionally)?.cause
when (expect) {
is JobNode<*> -> try { // SINGLE/SINGLE+ state -- one completion handler (common case)
expect.invoke(cause)
} catch (ex: Throwable) {
handleException(ex)
}
is NodeList -> notifyCompletion(expect, cause) // LIST state -- a list of completion handlers
is Cancelling -> notifyCompletion(expect.list, cause) // has list, too
// otherwise -- do nothing (it was Empty*)
else -> check(expect is Empty)
}
// Do overridable processing after completion handlers
if (expect !is Cancelling) onCancellation() // only notify when was not cancelling before
afterCompletion(update, mode)
}
private inline fun <reified T: JobNode<*>> notifyHandlers(list: NodeList, cause: Throwable?) {
var exception: Throwable? = null
list.forEach<T> { node ->
try {
node.invoke(cause)
} catch (ex: Throwable) {
exception?.apply { addSuppressed(ex) } ?: run { exception = ex }
}
}
exception?.let { handleException(it) }
}
private fun notifyCompletion(list: NodeList, cause: Throwable?) =
notifyHandlers<JobNode<*>>(list, cause)
private fun notifyCancellation(list: NodeList, cause: Throwable?) =
notifyHandlers<JobCancellationNode<*>>(list, cause)
public final override fun start(): Boolean {
lockFreeLoopOnState { state ->
when (startInternal(state)) {
FALSE -> return false
TRUE -> return true
}
}
}
// returns: RETRY/FALSE/TRUE:
// FALSE when not new,
// TRUE when started
// RETRY when need to retry
private fun startInternal(state: Any?): Int {
when (state) {
is Empty -> { // EMPTY_X state -- no completion handlers
if (state.isActive) return FALSE // already active
if (!STATE.compareAndSet(this, state, EmptyActive)) return RETRY
onStart()
return TRUE
}
is NodeList -> { // LIST -- a list of completion handlers (either new or active)
if (state.active != 0) return FALSE
if (!NodeList.ACTIVE.compareAndSet(state, 0, 1)) return RETRY
onStart()
return TRUE
}
else -> return FALSE // not a new state
}
}
/**
* Override to provide the actual [start] action.
*/
protected open fun onStart() {}
public final override fun getCompletionException(): Throwable {
val state = this.state
return when (state) {
is Cancelling -> state.cancelled.exception
is Incomplete -> error("Job was not completed or cancelled yet")
is CompletedExceptionally -> state.exception
else -> CancellationException("Job has completed normally")
}
}
/**
* Returns the cause that signals the completion of this job -- it returns the original
* [cancel] cause or **`null` if this job had completed
* normally or was cancelled without a cause**. This function throws
* [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor
* [isCancelled] yet.
*/
protected fun getCompletionCause(): Throwable? {
val state = this.state
return when (state) {
is Cancelling -> state.cancelled.cause
is Incomplete -> error("Job was not completed or cancelled yet")
is CompletedExceptionally -> state.cause
else -> null
}
}
public final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle =
installHandler(handler, onCancelling = false)
public final override fun invokeOnCompletion(handler: CompletionHandler, onCancelling: Boolean): DisposableHandle =
installHandler(handler, onCancelling = onCancelling && hasCancellingState)
private fun installHandler(handler: CompletionHandler, onCancelling: Boolean): DisposableHandle {
var nodeCache: JobNode<*>? = null
lockFreeLoopOnState { state ->
when (state) {
is Empty -> { // EMPTY_X state -- no completion handlers
if (state.isActive) {
// try move to SINGLE state
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
if (STATE.compareAndSet(this, state, node)) return node
} else
promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
}
is JobNode<*> -> { // SINGLE/SINGLE+ state -- one completion handler
promoteSingleToNodeList(state)
}
is NodeList -> { // LIST -- a list of completion handlers (either new or active)
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
if (addLastAtomic(state, state, node)) return node
}
is Cancelling -> { // CANCELLING -- has a list of completion handlers
if (onCancelling) { // installing cancellation handler on job that is being cancelled
handler((state as? CompletedExceptionally)?.exception)
return NonDisposableHandle
}
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
if (addLastAtomic(state, state.list, node)) return node
}
else -> { // is inactive
handler((state as? CompletedExceptionally)?.exception)
return NonDisposableHandle
}
}
}
}
private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode<*> =
if (onCancelling)
(handler as? JobCancellationNode<*>)?.also { require(it.job === this) }
?: InvokeOnCancellation(this, handler)
else
(handler as? JobNode<*>)?.also { require(it.job === this && (!hasCancellingState || it !is JobCancellationNode)) }
?: InvokeOnCompletion(this, handler)
private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode<*>) =
list.addLastIf(node) { this.state === expect }
private fun promoteEmptyToNodeList(state: Empty) {
// try to promote it to list in new state
STATE.compareAndSet(this, state, NodeList(state.isActive))
}
private fun promoteSingleToNodeList(state: JobNode<*>) {
// try to promote it to list (SINGLE+ state)
state.addOneIfEmpty(NodeList(active = true))
// it must be in SINGLE+ state or state has changed (node could have need removed from state)
val list = state.next // either NodeList or somebody else won the race, updated state
// just attempt converting it to list if state is still the same, then we'll continue lock-free loop
STATE.compareAndSet(this, state, list)
}
final override suspend fun join() {
if (!joinInternal()) return // fast-path no wait
return joinSuspend() // slow-path wait
}
private fun joinInternal(): Boolean {
lockFreeLoopOnState { state ->
if (state !is Incomplete) return false // not active anymore (complete) -- no need to wait
if (startInternal(state) >= 0) return true // wait unless need to retry
}
}
private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
cont.disposeOnCompletion(invokeOnCompletion(ResumeOnCompletion(this, cont)))
}
override fun <R> registerSelectJoin(select: SelectInstance<R>, block: suspend () -> R) {
// fast-path -- check state and select/return if needed
lockFreeLoopOnState { state ->
if (select.isSelected) return
if (state !is Incomplete) {
// already complete -- select result
if (select.trySelect(null))
block.startCoroutineUndispatched(select.completion)
return
}
if (startInternal(state) == 0) {
// slow-path -- register waiter for completion
select.disposeOnSelect(invokeOnCompletion(SelectJoinOnCompletion(this, select, block)))
return
}
}
}
internal fun removeNode(node: JobNode<*>) {
// remove logic depends on the state of the job
lockFreeLoopOnState { state ->
when (state) {
is JobNode<*> -> { // SINGE/SINGLE+ state -- one completion handler
if (state !== node) return // a different job node --> we were already removed
// try remove and revert back to empty state
if (STATE.compareAndSet(this, state, EmptyActive)) return
}
is NodeList, is Cancelling -> { // LIST or CANCELLING -- a list of completion handlers
// remove node from the list
node.remove()
return
}
else -> return // it is inactive or Empty* (does not have any completion handlers)
}
}
}
protected open val hasCancellingState: Boolean get() = false
public final override fun cancel(cause: Throwable?): Boolean =
if (hasCancellingState)
makeCancelling(cause) else
makeCancelled(cause)
// we will be dispatching coroutine to process its cancellation exception, so there is no need for
// an extra check for Job status in MODE_CANCELLABLE
private fun updateStateCancelled(state: Incomplete, cause: Throwable?) =
updateState(state, Cancelled(cause), mode = MODE_ATOMIC_DEFAULT)
// transitions to Cancelled state
private fun makeCancelled(cause: Throwable?): Boolean {
lockFreeLoopOnState { state ->
if (state !is Incomplete) return false // quit if already complete
if (updateStateCancelled(state, cause)) return true
}
}
// transitions to Cancelling state
private fun makeCancelling(cause: Throwable?): Boolean {
lockFreeLoopOnState { state ->
when (state) {
is Empty -> { // EMPTY_X state -- no completion handlers
if (state.isActive) {
promoteEmptyToNodeList(state) // this way can wrap it into Cancelling on next pass
} else {
// cancelling a non-started coroutine makes it immediately cancelled
// (and we have no listeners to notify which makes it very simple)
if (updateStateCancelled(state, cause)) return true
}
}
is JobNode<*> -> { // SINGLE/SINGLE+ state -- one completion handler
promoteSingleToNodeList(state)
}
is NodeList -> { // LIST -- a list of completion handlers (either new or active)
if (state.isActive) {
// try make it cancelling on the condition that we're still in this state
if (STATE.compareAndSet(this, state, Cancelling(state, Cancelled(cause)))) {
notifyCancellation(state, cause)
onCancellation()
return true
}
} else {
// cancelling a non-started coroutine makes it immediately cancelled
if (updateStateCancelled(state, cause))
return true
}
}
else -> { // is inactive or already cancelling
return false
}
}
}
}
/**
* Override to process any exceptions that were encountered while invoking completion handlers
* installed via [invokeOnCompletion].
*/
protected open fun handleException(exception: Throwable) {
throw exception
}
/**
* It is invoked once when job is cancelled or is completed, similarly to [invokeOnCompletion] with
* `onCancelling` set to `true`.
*/
protected open fun onCancellation() {}
/**
* Override for post-completion actions that need to do something with the state.
* @param mode completion mode.
*/
protected open fun afterCompletion(state: Any?, mode: Int) {}
// for nicer debugging
override fun toString(): String {
val state = this.state
val result = if (state is Incomplete) "" else "[$state]"
return "${this::class.java.simpleName}{${stateToString(state)}}$result@${Integer.toHexString(System.identityHashCode(this))}"
}
/**
* Interface for incomplete [state] of a job.
*/
public interface Incomplete {
val isActive: Boolean
}
private class Cancelling(
@JvmField val list: NodeList,
@JvmField val cancelled: Cancelled
) : Incomplete {
override val isActive: Boolean get() = false
}
private class NodeList(
active: Boolean
) : LockFreeLinkedListHead(), Incomplete {
@Volatile
@JvmField
var active: Int = if (active) 1 else 0
override val isActive: Boolean get() = active != 0
companion object {
@JvmField
val ACTIVE: AtomicIntegerFieldUpdater<NodeList> =
AtomicIntegerFieldUpdater.newUpdater(NodeList::class.java, "active")
}
override fun toString(): String = buildString {
append("List")
append(if (isActive) "{Active}" else "{New}")
append("[")
var first = true
this@NodeList.forEach<JobNode<*>> { node ->
if (first) first = false else append(", ")
append(node)
}
append("]")
}
}
/**
* Class for a [state] of a job that had completed exceptionally, including cancellation.
*
* @param cause the exceptional completion cause. If `cause` is null, then a [CancellationException]
* if created on first get from [exception] property.
*/
public open class CompletedExceptionally(
@JvmField val cause: Throwable?
) {
@Volatile
private var _exception: Throwable? = cause // materialize CancellationException on first need
/**
* Returns completion exception.
*/
public val exception: Throwable get() =
_exception ?: // atomic read volatile var or else create new
CancellationException("Job was cancelled").also { _exception = it }
override fun toString(): String = "${this::class.java.simpleName}[$exception]"
}
/**
* A specific subclass of [CompletedExceptionally] for cancelled jobs.
*/
public class Cancelled(
cause: Throwable?
) : CompletedExceptionally(cause)
/*
* =================================================================================================
* This is ready-to-use implementation for Deferred interface.
* However, it is not type-safe. Conceptually it just exposes the value of the underlying
* completed state as `Any?`
* =================================================================================================
*/
public val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally
protected fun getCompletedInternal(): Any? {
val state = this.state
check(state !is Incomplete) { "This job has not completed yet" }
if (state is CompletedExceptionally) throw state.exception
return state
}
protected suspend fun awaitInternal(): Any? {
// fast-path -- check state (avoid extra object creation)
while(true) { // lock-free loop on state
val state = this.state
if (state !is Incomplete) {
// already complete -- just return result
if (state is CompletedExceptionally) throw state.exception
return state
}
if (startInternal(state) >= 0) break // break unless needs to retry
}
return awaitSuspend() // slow-path
}
private suspend fun awaitSuspend(): Any? = suspendCancellableCoroutine { cont ->
cont.disposeOnCompletion(invokeOnCompletion {
val state = this.state
check(state !is Incomplete)
if (state is CompletedExceptionally)
cont.resumeWithException(state.exception)
else
cont.resume(state)
})
}
protected fun <R> registerSelectAwaitInternal(select: SelectInstance<R>, block: suspend (Any?) -> R) {
// fast-path -- check state and select/return if needed
lockFreeLoopOnState { state ->
if (select.isSelected) return
if (state !is Incomplete) {
// already complete -- select result
if (select.trySelect(null)) {
if (state is CompletedExceptionally)
select.resumeSelectCancellableWithException(state.exception)
else
block.startCoroutineUndispatched(state, select.completion)
}
return
}
if (startInternal(state) == 0) {
// slow-path -- register waiter for completion
select.disposeOnSelect(invokeOnCompletion(SelectAwaitOnCompletion(this, select, block)))
return
}
}
}
internal fun <R> selectAwaitCompletion(select: SelectInstance<R>, block: suspend (Any?) -> R) {
val state = this.state
// Note: await is non-atomic (can be cancelled while dispatched)
if (state is CompletedExceptionally)
select.resumeSelectCancellableWithException(state.exception)
else
block.startCoroutineCancellable(state, select.completion)
}
}
private const val RETRY = -1
private const val FALSE = 0
private const val TRUE = 1
private val EmptyNew = Empty(false)
private val EmptyActive = Empty(true)
private class Empty(override val isActive: Boolean) : JobSupport.Incomplete {
override fun toString(): String = "Empty{${if (isActive) "Active" else "New" }}"
}
private class JobImpl(parent: Job? = null) : JobSupport(true) {
init { initParentJob(parent) }
}
// -------- invokeOnCompletion nodes
internal abstract class JobNode<out J : Job>(
@JvmField val job: J
) : LockFreeLinkedListNode(), DisposableHandle, CompletionHandler, JobSupport.Incomplete {
final override val isActive: Boolean get() = true
final override fun dispose() = (job as JobSupport).removeNode(this)
override abstract fun invoke(reason: Throwable?)
}
private class InvokeOnCompletion(
job: Job,
private val handler: CompletionHandler
) : JobNode<Job>(job) {
override fun invoke(reason: Throwable?) = handler.invoke(reason)
override fun toString() = "InvokeOnCompletion[${handler::class.java.name}@${Integer.toHexString(System.identityHashCode(handler))}]"
}
private class ResumeOnCompletion(
job: Job,
private val continuation: Continuation<Unit>
) : JobNode<Job>(job) {
override fun invoke(reason: Throwable?) = continuation.resume(Unit)
override fun toString() = "ResumeOnCompletion[$continuation]"
}
internal class DisposeOnCompletion(
job: Job,
private val handle: DisposableHandle
) : JobNode<Job>(job) {
override fun invoke(reason: Throwable?) = handle.dispose()
override fun toString(): String = "DisposeOnCompletion[$handle]"
}
private class CancelFutureOnCompletion(
job: Job,
private val future: Future<*>
) : JobNode<Job>(job) {
override fun invoke(reason: Throwable?) {
// Don't interrupt when cancelling future on completion, because no one is going to reset this
// interruption flag and it will cause spurious failures elsewhere
future.cancel(false)
}
override fun toString() = "CancelFutureOnCompletion[$future]"
}
private class SelectJoinOnCompletion<R>(
job: JobSupport,
private val select: SelectInstance<R>,
private val block: suspend () -> R
) : JobNode<JobSupport>(job) {
override fun invoke(reason: Throwable?) {
if (select.trySelect(null))
block.startCoroutineCancellable(select.completion)
}
override fun toString(): String = "SelectJoinOnCompletion[$select]"
}
private class SelectAwaitOnCompletion<R>(
job: JobSupport,
private val select: SelectInstance<R>,
private val block: suspend (Any?) -> R
) : JobNode<JobSupport>(job) {
override fun invoke(reason: Throwable?) {
if (select.trySelect(null))
job.selectAwaitCompletion(select, block)
}
override fun toString(): String = "SelectAwaitOnCompletion[$select]"
}
// -------- invokeOnCancellation nodes
internal abstract class JobCancellationNode<out J : Job>(job: J) : JobNode<J>(job) {
// shall be invoked at most once, so here is an additional flag
@Volatile
private var invoked: Int = 0
private companion object {
private val INVOKED: AtomicIntegerFieldUpdater<JobCancellationNode<*>> = AtomicIntegerFieldUpdater
.newUpdater<JobCancellationNode<*>>(JobCancellationNode::class.java, "invoked")
}
final override fun invoke(reason: Throwable?) {
if (INVOKED.compareAndSet(this, 0, 1)) invokeOnce(reason)
}
abstract fun invokeOnce(reason: Throwable?)
}
private class InvokeOnCancellation(
job: Job,
private val handler: CompletionHandler
) : JobCancellationNode<Job>(job) {
override fun invokeOnce(reason: Throwable?) = handler.invoke(reason)
override fun toString() = "InvokeOnCancellation[${handler::class.java.name}@${Integer.toHexString(System.identityHashCode(handler))}]"
}