blob: 574060e84f869b256760bbc25a2abae5aa8262f5 [file] [log] [blame]
/*
* Copyright 2016-2017 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kotlinx.coroutines.experimental
import kotlinx.coroutines.experimental.internal.LinkedListHead
import kotlinx.coroutines.experimental.internal.LinkedListNode
import kotlin.coroutines.experimental.CoroutineContext
import kotlin.coroutines.experimental.buildSequence
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
/**
* A background job. Conceptually, a job is a cancellable thing with a simple life-cycle that
* culminates in its completion. Jobs can be arranged into parent-child hierarchies where cancellation
* or completion of parent immediately cancels all its [children].
*
* The most basic instances of [Job] are created with [launch] coroutine builder or with a
* `Job()` factory function. Other coroutine builders and primitives like
* [Deferred] also implement [Job] interface.
*
* A job has the following states:
*
* | **State** | [isActive] | [isCompleted] | [isCancelled] |
* | --------------------------------------- | ---------- | ------------- | ------------- |
* | _New_ (optional initial state) | `false` | `false` | `false` |
* | _Active_ (default initial state) | `true` | `false` | `false` |
* | _Completing_ (optional transient state) | `true` | `false` | `false` |
* | _Cancelling_ (optional transient state) | `false` | `false` | `true` |
* | _Cancelled_ (final state) | `false` | `true` | `true` |
* | _Completed_ (final state) | `false` | `true` | `false` |
*
* Usually, a job is created in _active_ state (it is created and started). However, coroutine builders
* that provide an optional `start` parameter create a coroutine in _new_ state when this parameter is set to
* [CoroutineStart.LAZY]. Such a job can be made _active_ by invoking [start] or [join].
*
* A job can be _cancelled_ at any time with [cancel] function that forces it to transition to
* _cancelling_ state immediately. Job that is not backed by a coroutine (see `Job()` function) and does not have
* [children] becomes _cancelled_ on [cancel] immediately.
* Otherwise, job becomes _cancelled_ when it finishes executing its code and
* when all its children [complete][isCompleted].
*
* ```
* wait children
* +-----+ start +--------+ complete +-------------+ finish +-----------+
* | New | ---------------> | Active | -----------> | Completing | -------> | Completed |
* +-----+ +--------+ +-------------+ +-----------+
* | | |
* | cancel | cancel | cancel
* V V |
* +-----------+ finish +------------+ |
* | Cancelled | <--------- | Cancelling | <----------------+
* |(completed)| +------------+
* +-----------+
* ```
*
* A job in the [coroutineContext][CoroutineScope.coroutineContext] represents the coroutine itself.
* A job is active while the coroutine is working and job's cancellation aborts the coroutine when
* the coroutine is suspended on a _cancellable_ suspension point by throwing [CancellationException].
*
* A job can have a _parent_ job. A job with a parent is cancelled when its parent is cancelled or completes exceptionally.
* Parent job waits for all its children to complete in _completing_ or _cancelling_ state.
* _Completing_ state is purely internal to the job. For an outside observer a _completing_ job is still active,
* while internally it is waiting for its children.
*
* All functions on this interface and on all interfaces derived from it are **thread-safe** and can
* be safely invoked from concurrent coroutines without external synchronization.
*/
public actual interface Job : CoroutineContext.Element {
// ------------ state query ------------
/**
* Returns `true` when this job is active -- it was already started and has not completed or cancelled yet.
* The job that is waiting for its [children] to complete is still considered to be active if it
* was not cancelled.
*/
public actual val isActive: Boolean
/**
* Returns `true` when this job has completed for any reason. A job that was cancelled and has
* finished its execution is also considered complete. Job becomes complete only after
* all its [children] complete.
*/
public actual val isCompleted: Boolean
/**
* Returns `true` if this job was [cancelled][cancel]. In the general case, it does not imply that the
* job has already [completed][isCompleted] (it may still be cancelling whatever it was doing).
*/
public actual val isCancelled: Boolean
/**
* Returns [CancellationException] that signals the completion of this job. This function is
* used by [cancellable][suspendCancellableCoroutine] suspending functions. They throw exception
* returned by this function when they suspend in the context of this job and this job becomes _complete_.
*
* This function returns the original [cancel] cause of this job if that `cause` was an instance of
* [CancellationException]. Otherwise (if this job was cancelled with a cause of a different type, or
* was cancelled without a cause, or had completed normally), an instance of [JobCancellationException] is
* returned. The [JobCancellationException.cause] of the resulting [JobCancellationException] references
* the original cancellation cause that was passed to [cancel] function.
*
* This function throws [IllegalStateException] when invoked on a job that has not
* [completed][isCompleted] nor [cancelled][isCancelled] yet.
*/
public actual fun getCancellationException(): CancellationException
// ------------ state update ------------
/**
* Starts coroutine related to this job (if any) if it was not started yet.
* The result `true` if this invocation actually started coroutine or `false`
* if it was already started or completed.
*/
public actual fun start(): Boolean
/**
* Cancels this job with an optional cancellation [cause]. The result is `true` if this job was
* cancelled as a result of this invocation and `false` otherwise
* (if it was already _completed_ or if it is [NonCancellable]).
* Repeated invocations of this function have no effect and always produce `false`.
*
* When cancellation has a clear reason in the code, an instance of [CancellationException] should be created
* at the corresponding original cancellation site and passed into this method to aid in debugging by providing
* both the context of cancellation and text description of the reason.
*/
public actual fun cancel(cause: Throwable? = null): Boolean
// ------------ parent-child ------------
/**
* Returns a sequence of this job's children.
*
* A job becomes a child of this job when it is constructed with this job in its
* [CoroutineContext] or using an explicit `parent` parameter.
*
* A parent-child relation has the following effect:
*
* * Cancellation of parent with [cancel] or its exceptional completion (failure)
* immediately cancels all its children.
* * Parent cannot complete until all its children are complete. Parent waits for all its children to
* complete in _completing_ or _cancelling_ state.
* * Uncaught exception in a child, by default, cancels parent. In particular, this applies to
* children created with [launch] coroutine builder. Note, that [async] and other future-like
* coroutine builders do not have uncaught exceptions by definition, since all their exceptions are
* caught and are encapsulated in their result.
*/
public actual val children: Sequence<Job>
/**
* Attaches child job so that this job becomes its parent and
* returns a handle that should be used to detach it.
*
* A parent-child relation has the following effect:
* * Cancellation of parent with [cancel] or its exceptional completion (failure)
* immediately cancels all its children.
* * Parent cannot complete until all its children are complete. Parent waits for all its children to
* complete in _completing_ or _cancelling_ state.
*
* **A child must store the resulting [DisposableHandle] and [dispose][DisposableHandle.dispose] the attachment
* to its parent on its own completion.**
*
* Coroutine builders and job factory functions that accept `parent` [CoroutineContext] parameter
* lookup a [Job] instance in the parent context and use this function to attach themselves as a child.
* They also store a reference to the resulting [DisposableHandle] and dispose a handle when they complete.
*
* @suppress This is an internal API. This method is too error prone for public API.
*/
@Deprecated(message = "Start child coroutine with 'parent' parameter", level = DeprecationLevel.WARNING)
public actual fun attachChild(child: Job): DisposableHandle
// ------------ state waiting ------------
/**
* Suspends coroutine until this job is complete. This invocation resumes normally (without exception)
* when the job is complete for any reason and the [Job] of the invoking coroutine is still [active][isActive].
* This function also [starts][Job.start] the corresponding coroutine if the [Job] was still in _new_ state.
*
* Note, that the job becomes complete only when all its children are complete.
*
* This suspending function is cancellable and **always** checks for the cancellation of invoking coroutine's Job.
* If the [Job] of the invoking coroutine is cancelled or completed when this
* suspending function is invoked or while it is suspended, this function
* throws [CancellationException].
*
* In particular, it means that a parent coroutine invoking `join` on a child coroutine that was started using
* `launch(coroutineContext) { ... }` builder throws [CancellationException] if the child
* had crashed, unless a non-standard [CoroutineExceptionHandler] if installed in the context.
*
* There is [cancelAndJoin] function that combines an invocation of [cancel] and `join`.
*/
public actual suspend fun join()
// ------------ low-level state-notification ------------
/**
* Registers handler that is **synchronously** invoked once on cancellation or completion of this job.
* When job is already cancelling or complete, then the handler is immediately invoked
* with a job's cancellation cause or `null` unless [invokeImmediately] is set to false.
* Otherwise, handler will be invoked once when this job is cancelled or complete.
*
* Invocation of this handler on a transition to a transient _cancelling_ state
* is controlled by [onCancelling] boolean parameter.
* The handler is invoked on invocation of [cancel] when
* job becomes _cancelling_ if [onCancelling] parameter is set to `true`. However,
* when this [Job] is not backed by a coroutine, like [CompletableDeferred] or [CancellableContinuation]
* (both of which do not posses a _cancelling_ state), then the value of [onCancelling] parameter is ignored.
*
* The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] the
* registration of this handler and release its memory if its invocation is no longer needed.
* There is no need to dispose the handler after completion of this job. The references to
* all the handlers are released when this job completes.
*
* Installed [handler] should not throw any exceptions. If it does, they will get caught,
* wrapped into [CompletionHandlerException], and rethrown, potentially causing crash of unrelated code.
*
* **Note**: This function is a part of internal machinery that supports parent-child hierarchies
* and allows for implementation of suspending functions that wait on the Job's state.
* This function should not be used in general application code.
* Implementations of `CompletionHandler` must be fast and _lock-free_.
*
* @param onCancelling when `true`, then the [handler] is invoked as soon as this job transitions to _cancelling_ state;
* when `false` then the [handler] is invoked only when it transitions to _completed_ state.
* @param invokeImmediately when `true` and this job is already in the desired state (depending on [onCancelling]),
* then the [handler] is immediately and synchronously invoked and [NonDisposableHandle] is returned;
* when `false` then [NonDisposableHandle] is returned, but the [handler] is not invoked.
* @param handler the handler.
*/
public actual fun invokeOnCompletion(
onCancelling: Boolean = false,
invokeImmediately: Boolean = true,
handler: CompletionHandler): DisposableHandle
/**
* Key for [Job] instance in the coroutine context.
*/
public actual companion object Key : CoroutineContext.Key<Job>
}
/**
* Creates a new job object in an _active_ state.
* It is optionally a child of a [parent] job.
*/
@Suppress("FunctionName")
public actual fun Job(parent: Job? = null): Job = JobImpl(parent)
/**
* A handle to an allocated object that can be disposed to make it eligible for garbage collection.
*/
public actual interface DisposableHandle {
/**
* Disposes the corresponding object, making it eligible for garbage collection.
* Repeated invocation of this function has no effect.
*/
public actual fun dispose()
}
// -------------------- CoroutineContext extensions --------------------
/**
* Cancels [Job] of this context with an optional cancellation [cause]. The result is `true` if the job was
* cancelled as a result of this invocation and `false` if there is no job in the context or if it was already
* cancelled or completed. See [Job.cancel] for details.
*/
public actual fun CoroutineContext.cancel(cause: Throwable? = null): Boolean =
this[Job]?.cancel(cause) ?: false
/**
* Cancels all children of the [Job] in this context with an optional cancellation [cause].
* It does not do anything if there is no job in the context or it has no children.
* See [Job.cancelChildren] for details.
*/
public actual fun CoroutineContext.cancelChildren(cause: Throwable? = null) {
this[Job]?.cancelChildren(cause)
}
// -------------------- Job extensions --------------------
/**
* Disposes a specified [handle] when this job is complete.
*
* This is a shortcut for the following code:
* ```
* invokeOnCompletion { handle.dispose() }
* ```
*/
public actual fun Job.disposeOnCompletion(handle: DisposableHandle): DisposableHandle =
invokeOnCompletion { handle.dispose() }
/**
* Cancels the job and suspends invoking coroutine until the cancelled job is complete.
*
* This suspending function is cancellable and **always** checks for the cancellation of invoking coroutine's Job.
* If the [Job] of the invoking coroutine is cancelled or completed when this
* suspending function is invoked or while it is suspended, this function
* throws [CancellationException].
*
* In particular, it means that a parent coroutine invoking `cancelAndJoin` on a child coroutine that was started using
* `launch(coroutineContext) { ... }` builder throws [CancellationException] if the child
* had crashed, unless a non-standard [CoroutineExceptionHandler] if installed in the context.
*
* This is a shortcut for the invocation of [cancel][Job.cancel] followed by [join][Job.join].
*/
public actual suspend fun Job.cancelAndJoin() {
cancel()
return join()
}
/**
* Cancels all [children][Job.children] jobs of this coroutine with the given [cause] using [Job.cancel]
* for all of them. Unlike [Job.cancel] on this job as a whole, the state of this job itself is not affected.
*/
public actual fun Job.cancelChildren(cause: Throwable? = null) {
children.forEach { it.cancel(cause) }
}
/**
* Suspends coroutine until all [children][Job.children] of this job are complete using
* [Job.join] for all of them. Unlike [Job.join] on this job as a whole, it does not wait until
* this job is complete.
*/
public actual suspend fun Job.joinChildren() {
children.forEach { it.join() }
}
/**
* No-op implementation of [DisposableHandle].
*/
public actual object NonDisposableHandle : DisposableHandle {
/** Does not do anything. */
actual override fun dispose() {}
/** Returns "NonDisposableHandle" string. */
override fun toString(): String = "NonDisposableHandle"
}
// --------------- helper classes to simplify job implementation
/**
* A concrete implementation of [Job]. It is optionally a child to a parent job.
* This job is cancelled when the parent is complete, but not vise-versa.
*
* This is an open class designed for extension by more specific classes that might augment the
* state and mare store addition state information for completed jobs, like their result values.
*
* @param active when `true` the job is created in _active_ state, when `false` in _new_ state. See [Job] for details.
* @suppress **This is unstable API and it is subject to change.**
*/
public open class JobSupport(active: Boolean) : Job {
override val key: CoroutineContext.Key<*> get() = Job
// Note: use shared objects while we have no listeners
protected var state: Any? = if (active) EmptyActive else EmptyNew
private set
private var parentHandle: DisposableHandle? = null
// ------------ initialization ------------
/**
* Initializes parent job.
* It shall be invoked at most once after construction after all other initialization.
*/
public fun initParentJob(parent: Job?) {
check(parentHandle == null) { "Shall be invoked at most once" }
if (parent == null) {
parentHandle = NonDisposableHandle
return
}
parent.start() // make sure the parent is started
@Suppress("DEPRECATION")
val handle = parent.attachChild(this)
parentHandle = handle
// now check our state _after_ registering (see updateState order of actions)
if (isCompleted) {
handle.dispose()
parentHandle = NonDisposableHandle // release it just in case, to aid GC
}
}
// ------------ state query ------------
public final override val isActive: Boolean get() {
val state = this.state
return state is Incomplete && state.isActive
}
public final override val isCompleted: Boolean get() = state !is Incomplete
public final override val isCancelled: Boolean get() {
val state = this.state
return state is Cancelled || (state is Finishing && state.cancelled != null)
}
// ------------ state update ------------
/**
* Updates current [state] of this job.
* @suppress **This is unstable API and it is subject to change.**
*/
internal fun updateState(proposedUpdate: Any?, mode: Int) {
val state = this.state as Incomplete // current state must be incomplete
val update = coerceProposedUpdate(state, proposedUpdate)
tryUpdateState(update)
completeUpdateState(state, update, mode)
}
internal fun tryUpdateState(update: Any?) {
require(update !is Incomplete) // only incomplete -> completed transition is allowed
this.state = update
// Unregister from parent job
parentHandle?.let {
it.dispose()
parentHandle = NonDisposableHandle // release it just in case, to aid GC
}
}
// when Job is in Cancelling state, it can only be promoted to Cancelled state,
// so if the proposed Update is not an appropriate Cancelled (preserving the cancellation cause),
// then the corresponding Cancelled state is constructed.
private fun coerceProposedUpdate(expect: Incomplete, proposedUpdate: Any?): Any? =
if (expect is Finishing && expect.cancelled != null && !isCorrespondinglyCancelled(expect.cancelled, proposedUpdate))
createCancelled(expect.cancelled, proposedUpdate) else proposedUpdate
private fun isCorrespondinglyCancelled(cancelled: Cancelled, proposedUpdate: Any?): Boolean {
if (proposedUpdate !is Cancelled) return false
// NOTE: equality comparison of causes is performed here by design, see equals of JobCancellationException
return proposedUpdate.cause == cancelled.cause ||
proposedUpdate.cause is JobCancellationException && cancelled.cause == null
}
private fun createCancelled(cancelled: Cancelled, proposedUpdate: Any?): Cancelled {
if (proposedUpdate !is CompletedExceptionally) return cancelled // not exception -- just use original cancelled
val exception = proposedUpdate.exception
if (cancelled.exception == exception) return cancelled // that is the cancelled we need already!
//cancelled.cause?.let { exception.addSuppressed(it) }
return Cancelled(this, exception)
}
/**
* Completes update of the current [state] of this job.
* @suppress **This is unstable API and it is subject to change.**
*/
internal fun completeUpdateState(expect: Incomplete, update: Any?, mode: Int) {
// Invoke completion handlers
val exceptionally = update as? CompletedExceptionally
val cause = exceptionally?.cause
if (expect is JobNode<*>) { // SINGLE/SINGLE+ state -- one completion handler (common case)
try {
expect.invoke(cause)
} catch (ex: Throwable) {
handleException(CompletionHandlerException("Exception in completion handler $expect for $this", ex))
}
} else {
expect.list?.notifyCompletion(cause)
}
// Do overridable processing after completion handlers
if (!expect.isCancelling) onCancellation(exceptionally) // only notify when was not cancelling before
afterCompletion(update, mode)
}
private inline fun <reified T: JobNode<*>> notifyHandlers(list: NodeList, cause: Throwable?) {
var exception: Throwable? = null
list.forEach<T> { node ->
try {
node.invoke(cause)
} catch (ex: Throwable) {
exception?.apply { /* addSuppressed(ex) */ } ?: run {
exception = CompletionHandlerException("Exception in completion handler $node for $this", ex)
}
}
}
exception?.let { handleException(it) }
}
private fun NodeList.notifyCompletion(cause: Throwable?) =
notifyHandlers<JobNode<*>>(this, cause)
private fun notifyCancellation(list: NodeList, cause: Throwable?) =
notifyHandlers<JobCancellationNode<*>>(list, cause)
public final override fun start(): Boolean {
val state = this.state
when (state) {
is Empty -> { // EMPTY_X state -- no completion handlers
if (state.isActive) return false // already active
this.state = EmptyActive
onStart()
return true
}
is NodeList -> { // LIST -- a list of completion handlers (either new or active)
return state.makeActive().also { result ->
if (result) onStart()
}
}
else -> return false // not a new state
}
}
/**
* Override to provide the actual [start] action.
*/
protected open fun onStart() {}
public final override fun getCancellationException(): CancellationException {
val state = this.state
return when {
state is Finishing && state.cancelled != null ->
state.cancelled.exception.toCancellationException("Job is being cancelled")
state is Incomplete ->
error("Job was not completed or cancelled yet: $this")
state is CompletedExceptionally ->
state.exception.toCancellationException("Job has failed")
else -> JobCancellationException("Job has completed normally", null, this)
}
}
private fun Throwable.toCancellationException(message: String): CancellationException =
this as? CancellationException ?: JobCancellationException(message, this, this@JobSupport)
/**
* Returns the cause that signals the completion of this job -- it returns the original
* [cancel] cause or **`null` if this job had completed
* normally or was cancelled without a cause**. This function throws
* [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor
* [isCancelled] yet.
*/
protected fun getCompletionCause(): Throwable? {
val state = this.state
return when {
state is Finishing && state.cancelled != null -> state.cancelled.cause
state is Incomplete -> error("Job was not completed or cancelled yet")
state is CompletedExceptionally -> state.cause
else -> null
}
}
public final override fun invokeOnCompletion(onCancelling: Boolean, invokeImmediately: Boolean, handler: CompletionHandler) =
installNode(onCancelling, invokeImmediately, makeNode(handler, onCancelling))
private fun installNode(
onCancelling: Boolean,
invokeImmediately: Boolean,
node: JobNode<*>
): DisposableHandle {
while (true) {
val state = this.state
when (state) {
is Empty -> { // EMPTY_X state -- no completion handlers
if (state.isActive) {
// move to SINGLE state
this.state = node
return node
} else
promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
}
is Incomplete -> {
val list = state.list
if (list == null) { // SINGLE/SINGLE+
promoteSingleToNodeList(state as JobNode<*>)
} else {
if (state is Finishing && state.cancelled != null && onCancelling) {
// cannot be in this state unless were support cancelling state
check(onCancelMode != ON_CANCEL_MAKE_CANCELLED) // cannot be in this state unless were support cancelling state
// installing cancellation handler on job that is being cancelled
if (invokeImmediately) node.invoke(state.cancelled.cause)
return NonDisposableHandle
}
list.addLast(node)
return node
}
}
else -> { // is complete
if (invokeImmediately) node.invoke((state as? CompletedExceptionally)?.cause)
return NonDisposableHandle
}
}
}
}
private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode<*> {
val hasCancellingState = onCancelMode != ON_CANCEL_MAKE_CANCELLED
return if (onCancelling && hasCancellingState)
InvokeOnCancellation(this, handler)
else
InvokeOnCompletion(this, handler)
}
private fun promoteEmptyToNodeList(state: Empty) {
check(state === this.state) { "Expected empty state"}
// promote it to list in new state
this.state = NodeList(state.isActive)
}
private fun promoteSingleToNodeList(state: JobNode<*>) {
check(state === this.state) { "Expected single state" }
// promote it to list (SINGLE+ state)
val list = NodeList(isActive = true)
list.addLast(state)
this.state = list
}
final override suspend fun join() {
if (!joinInternal()) { // fast-path no wait
return suspendCoroutineOrReturn { cont ->
cont.context.checkCompletion()
Unit // do not suspend
}
}
return joinSuspend() // slow-path wait
}
private fun joinInternal(): Boolean {
if (state !is Incomplete) return false // not active anymore (complete) -- no need to wait
start()
return true // wait
}
private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
val handle = invokeOnCompletion { cont.resume(Unit) }
cont.invokeOnCompletion { handle.dispose() }
}
internal fun removeNode(node: JobNode<*>) {
// remove logic depends on the state of the job
val state = this.state
when (state) {
is JobNode<*> -> { // SINGE/SINGLE+ state -- one completion handler
if (state !== node) return // a different job node --> we were already removed
// remove and revert back to empty state
this.state = EmptyActive
}
is Incomplete -> { // may have a list of completion handlers
// remove node from the list if there is a list
if (state.list != null) node.remove()
}
}
}
protected open val onCancelMode: Int get() = ON_CANCEL_MAKE_CANCELLING
public override fun cancel(cause: Throwable?): Boolean = when (onCancelMode) {
ON_CANCEL_MAKE_CANCELLED -> makeCancelled(cause)
ON_CANCEL_MAKE_CANCELLING -> makeCancelling(cause)
ON_CANCEL_MAKE_COMPLETING -> makeCompletingOnCancel(cause)
else -> error("Invalid onCancelMode $onCancelMode")
}
// we will be dispatching coroutine to process its cancellation exception, so there is no need for
// an extra check for Job status in MODE_CANCELLABLE
private fun updateStateCancelled(cause: Throwable?) =
updateState(Cancelled(this, cause), mode = MODE_ATOMIC_DEFAULT)
// transitions to Cancelled state
private fun makeCancelled(cause: Throwable?): Boolean {
if (state !is Incomplete) return false // quit if already complete
updateStateCancelled(cause)
return true
}
// transitions to Cancelling state
private fun makeCancelling(cause: Throwable?): Boolean {
while (true) {
val state = this.state
when (state) {
is Empty -> { // EMPTY_X state -- no completion handlers
if (state.isActive) {
promoteEmptyToNodeList(state) // this way can wrap it into Cancelling on next pass
} else {
// cancelling a non-started coroutine makes it immediately cancelled
// (and we have no listeners to notify which makes it very simple)
updateStateCancelled(cause)
return true
}
}
is JobNode<*> -> { // SINGLE/SINGLE+ state -- one completion handler
promoteSingleToNodeList(state)
}
is NodeList -> { // LIST -- a list of completion handlers (either new or active)
if (state.isActive) {
makeCancellingList(state.list, cause)
return true
} else {
// cancelling a non-started coroutine makes it immediately cancelled
updateStateCancelled(cause)
return true
}
}
is Finishing -> { // Completing/Cancelling the job, may cancel
if (state.cancelled != null) return false // already cancelling
makeCancellingList(state.list, cause)
return true
}
else -> { // is inactive
return false
}
}
}
}
// make expected state in cancelling
private fun makeCancellingList(list: NodeList, cause: Throwable?) {
val cancelled = Cancelled(this, cause)
state = Finishing(list, cancelled, false)
notifyCancellation(list, cause)
onCancellation(cancelled)
}
private fun makeCompletingOnCancel(cause: Throwable?): Boolean =
makeCompleting(Cancelled(this, cause))
internal fun makeCompleting(proposedUpdate: Any?): Boolean =
when (makeCompletingInternal(proposedUpdate, mode = MODE_ATOMIC_DEFAULT)) {
COMPLETING_ALREADY_COMPLETING -> false
else -> true
}
/**
* Returns:
* * `true` if state was updated to completed/cancelled;
* * `false` if made completing or it is cancelling and is waiting for children.
*
* @throws IllegalStateException if job is already complete or completing
* @suppress **This is unstable API and it is subject to change.**
*/
internal fun makeCompletingOnce(proposedUpdate: Any?, mode: Int): Boolean =
when (makeCompletingInternal(proposedUpdate, mode)) {
COMPLETING_COMPLETED -> true
COMPLETING_WAITING_CHILDREN -> false
else -> throw IllegalStateException("Job $this is already complete or completing, " +
"but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull)
}
private fun makeCompletingInternal(proposedUpdate: Any?, mode: Int): Int {
loop@ while (true) {
val state = this.state
@Suppress("FoldInitializerAndIfToElvis")
if (state !is Incomplete)
return COMPLETING_ALREADY_COMPLETING
if (state is Finishing && state.completing)
return COMPLETING_ALREADY_COMPLETING
val child: Child = firstChild(state) ?: run {
// or else complete immediately w/o children
updateState(proposedUpdate, mode)
return COMPLETING_COMPLETED
}
// must promote to list to correct operate on child lists
if (state is JobNode<*>) {
promoteSingleToNodeList(state)
continue@loop // retry
}
// cancel all children in list on exceptional completion
if (proposedUpdate is CompletedExceptionally)
child.cancelChildrenInternal(proposedUpdate.exception)
// switch to completing state
val completing = Finishing(state.list!!, (state as? Finishing)?.cancelled, true)
this.state = completing
if (tryWaitForChild(child, proposedUpdate))
return COMPLETING_WAITING_CHILDREN
updateState(proposedUpdate, mode = MODE_ATOMIC_DEFAULT)
return COMPLETING_COMPLETED
}
}
private tailrec fun Child.cancelChildrenInternal(cause: Throwable) {
childJob.cancel(JobCancellationException("Child job was cancelled because of parent failure", cause, childJob))
nextChild()?.cancelChildrenInternal(cause)
}
private val Any?.exceptionOrNull: Throwable?
get() = (this as? CompletedExceptionally)?.exception
private fun firstChild(state: Incomplete) =
state as? Child ?: state.list?.nextChild()
// return false when there is no more incomplete children to wait
private tailrec fun tryWaitForChild(child: Child, proposedUpdate: Any?): Boolean {
val handle = child.childJob.invokeOnCompletion(invokeImmediately = false) {
continueCompleting(child, proposedUpdate)
}
if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
val nextChild = child.nextChild() ?: return false
return tryWaitForChild(nextChild, proposedUpdate)
}
internal fun continueCompleting(lastChild: Child, proposedUpdate: Any?) {
val state = this.state
@Suppress("FoldInitializerAndIfToElvis")
if (state !is Finishing)
throw IllegalStateException("Job $this is found in expected state while completing with $proposedUpdate", proposedUpdate.exceptionOrNull)
// figure out if we need to wait for next child
val waitChild = lastChild.nextChild()
// try wait for next child
if (waitChild != null && tryWaitForChild(waitChild, proposedUpdate)) return // waiting for next child
// no more children to wait -- update state
updateState(proposedUpdate, mode = MODE_ATOMIC_DEFAULT)
}
private fun LinkedListNode.nextChild(): Child? {
var cur = this
while (cur.isRemoved) cur = cur.prev // rollback to prev non-removed (or list head)
while (true) {
cur = cur.next
if (cur is Child) return cur
if (cur is NodeList) return null // checked all -- no more children
}
}
override val children: Sequence<Job> get() = buildSequence<Job> {
val state = this@JobSupport.state
when (state) {
is Child -> yield(state.childJob)
is Incomplete -> state.list?.let { list ->
list.forEach<Child> { yield(it.childJob) }
}
}
}
@Suppress("OverridingDeprecatedMember")
override fun attachChild(child: Job): DisposableHandle =
installNode(onCancelling = true, invokeImmediately = true, node = Child(this, child))
/**
* Override to process any exceptions that were encountered while invoking completion handlers
* installed via [invokeOnCompletion].
*/
protected open fun handleException(exception: Throwable) {
throw exception
}
/**
* It is invoked once when job is cancelled or is completed, similarly to [invokeOnCompletion] with
* `onCancelling` set to `true`.
* @param exceptionally not null when the the job was cancelled or completed exceptionally,
* null when it has completed normally.
* @suppress **This is unstable API and it is subject to change.**
*/
protected open fun onCancellation(exceptionally: CompletedExceptionally?) {}
/**
* Override for post-completion actions that need to do something with the state.
* @param mode completion mode.
* @suppress **This is unstable API and it is subject to change.**
*/
protected open fun afterCompletion(state: Any?, mode: Int) {}
// for nicer debugging
override fun toString(): String = "Job{${stateString()}}"
protected fun stateString(): String {
val state = this.state
return when (state) {
is Finishing -> buildString {
if (state.cancelled != null) append("Cancelling")
if (state.completing) append("Completing")
}
is Incomplete -> if (state.isActive) "Active" else "New"
is Cancelled -> "Cancelled"
is CompletedExceptionally -> "CompletedExceptionally"
else -> "Completed"
}
}
/**
* @suppress **This is unstable API and it is subject to change.**
*/
internal interface Incomplete {
val isActive: Boolean
val list: NodeList? // is null only for Empty and JobNode incomplete state objects
}
// Cancelling or Completing
private class Finishing(
override val list: NodeList,
val cancelled: Cancelled?, /* != null when cancelling */
val completing: Boolean /* true when completing */
) : Incomplete {
override val isActive: Boolean get() = cancelled == null
}
private val Incomplete.isCancelling: Boolean
get() = this is Finishing && cancelled != null
/**
* @suppress **This is unstable API and it is subject to change.**
*/
internal class NodeList(
override var isActive: Boolean
) : LinkedListHead(), Incomplete {
override val list: NodeList get() = this
fun makeActive(): Boolean {
if (isActive) return false
isActive = true
return true
}
override fun toString(): String = buildString {
append("List")
append(if (isActive) "{Active}" else "{New}")
append("[")
var first = true
this@NodeList.forEach<JobNode<*>> { node ->
if (first) first = false else append(", ")
append(node)
}
append("]")
}
}
/*
* =================================================================================================
* This is ready-to-use implementation for Deferred interface.
* However, it is not type-safe. Conceptually it just exposes the value of the underlying
* completed state as `Any?`
* =================================================================================================
*/
public val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally
public fun getCompletionExceptionOrNull(): Throwable? {
val state = this.state
check(state !is Incomplete) { "This job has not completed yet" }
return state.exceptionOrNull
}
protected fun getCompletedInternal(): Any? {
val state = this.state
check(state !is Incomplete) { "This job has not completed yet" }
if (state is CompletedExceptionally) throw state.exception
return state
}
protected suspend fun awaitInternal(): Any? {
val state = this.state
if (state !is Incomplete) {
// already complete -- just return result
if (state is CompletedExceptionally) throw state.exception
return state
}
start()
return awaitSuspend() // slow-path
}
private suspend fun awaitSuspend(): Any? = suspendCancellableCoroutine { cont ->
val handle = invokeOnCompletion {
val state = this.state
check(state !is Incomplete) { "State should be complete "}
if (state is CompletedExceptionally)
cont.resumeWithException(state.exception)
else
cont.resume(state)
}
cont.invokeOnCompletion { handle.dispose() }
}
}
internal const val ON_CANCEL_MAKE_CANCELLED = 0
internal const val ON_CANCEL_MAKE_CANCELLING = 1
internal const val ON_CANCEL_MAKE_COMPLETING = 2
private const val COMPLETING_ALREADY_COMPLETING = 0
private const val COMPLETING_COMPLETED = 1
private const val COMPLETING_WAITING_CHILDREN = 2
@Suppress("PrivatePropertyName")
private val EmptyNew = Empty(false)
@Suppress("PrivatePropertyName")
private val EmptyActive = Empty(true)
private class Empty(override val isActive: Boolean) : JobSupport.Incomplete {
override val list: JobSupport.NodeList? get() = null
override fun toString(): String = "Empty{${if (isActive) "Active" else "New" }}"
}
private class JobImpl(parent: Job? = null) : JobSupport(true) {
init { initParentJob(parent) }
override val onCancelMode: Int get() = ON_CANCEL_MAKE_COMPLETING
}
// -------- invokeOnCompletion nodes
internal abstract class JobNode<out J : Job>(
val job: J
) : LinkedListNode(), DisposableHandle, JobSupport.Incomplete {
final override val isActive: Boolean get() = true
final override val list: JobSupport.NodeList? get() = null
final override fun dispose() = (job as JobSupport).removeNode(this)
abstract fun invoke(reason: Throwable?) // CompletionHandler -- invoked on completion
}
private class InvokeOnCompletion(
job: Job,
private val handler: CompletionHandler
) : JobNode<Job>(job) {
override fun invoke(reason: Throwable?) = handler.invoke(reason)
override fun toString() = "InvokeOnCompletion"
}
// -------- invokeOnCancellation nodes
/**
* Marker for node that shall be invoked on cancellation (in _cancelling_ state).
* **Note: may be invoked multiple times during cancellation.**
*/
internal abstract class JobCancellationNode<out J : Job>(job: J) : JobNode<J>(job)
private class InvokeOnCancellation(
job: Job,
private val handler: CompletionHandler
) : JobCancellationNode<Job>(job) {
// delegate handler shall be invoked at most once, so here is an additional flag
private var invoked = false
override fun invoke(reason: Throwable?) {
if (invoked) return
invoked = true
handler.invoke(reason)
}
override fun toString() = "InvokeOnCancellation"
}
internal class Child(
parent: JobSupport,
val childJob: Job
) : JobCancellationNode<JobSupport>(parent) {
override fun invoke(reason: Throwable?) {
// Always materialize the actual instance of parent's completion exception and cancel child with it
childJob.cancel(job.getCancellationException())
}
override fun toString(): String = "Child[$childJob]"
}