blob: 450163c668c09cdace5875b870057d75db964d65 [file] [log] [blame]
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.jvm.*
@Suppress("PrivatePropertyName")
@SharedImmutable
private val UNDEFINED = Symbol("UNDEFINED")
/**
* Executes given [block] as part of current event loop, updating current continuation
* mode and state if continuation is not resumed immediately.
* [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty).
* Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise.
*/
private inline fun DispatchedContinuation<*>.executeUnconfined(
contState: Any?, mode: Int, doYield: Boolean = false,
block: () -> Unit
) : Boolean {
val eventLoop = ThreadLocalEventLoop.eventLoop
// If we are yielding and unconfined queue is empty, we can bail out as part of fast path
if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
return if (eventLoop.isUnconfinedLoopActive) {
// When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
_state = contState
resumeMode = mode
eventLoop.dispatchUnconfined(this)
true // queued into the active loop
} else {
// Was not active -- run event loop until all unconfined tasks are executed
runUnconfinedEventLoop(eventLoop, block = block)
false
}
}
private fun DispatchedTask<*>.resumeUnconfined() {
val eventLoop = ThreadLocalEventLoop.eventLoop
if (eventLoop.isUnconfinedLoopActive) {
// When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
eventLoop.dispatchUnconfined(this)
} else {
// Was not active -- run event loop until all unconfined tasks are executed
runUnconfinedEventLoop(eventLoop) {
resume(delegate, MODE_UNDISPATCHED)
}
}
}
private inline fun DispatchedTask<*>.runUnconfinedEventLoop(
eventLoop: EventLoop,
block: () -> Unit
) {
eventLoop.incrementUseCount(unconfined = true)
try {
block()
while (true) {
// break when all unconfined continuations where executed
if (!eventLoop.processUnconfinedEvent()) break
}
} catch (e: Throwable) {
/*
* This exception doesn't happen normally, only if we have a bug in implementation.
* Report it as a fatal exception.
*/
handleFatalException(e, null)
} finally {
eventLoop.decrementUseCount(unconfined = true)
}
}
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
@JvmField
@Suppress("PropertyName")
internal var _state: Any? = UNDEFINED
override val callerFrame: CoroutineStackFrame? = continuation as? CoroutineStackFrame
override fun getStackTraceElement(): StackTraceElement? = null
@JvmField // pre-cached value to avoid ctx.fold on every resumption
internal val countOrElement = threadContextElements(context)
override fun takeState(): Any? {
val state = _state
check(state !== UNDEFINED) // fail-fast if repeatedly invoked
_state = UNDEFINED
return state
}
override val delegate: Continuation<T>
get() = this
override fun resumeWith(result: Result<T>) {
val context = continuation.context
val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_ATOMIC_DEFAULT
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_ATOMIC_DEFAULT) {
withCoroutineContext(this.context, countOrElement) {
continuation.resumeWith(result)
}
}
}
}
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
inline fun resumeCancellable(value: T) {
if (dispatcher.isDispatchNeeded(context)) {
_state = value
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
executeUnconfined(value, MODE_CANCELLABLE) {
if (!resumeCancelled()) {
resumeUndispatched(value)
}
}
}
}
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
inline fun resumeCancellableWithException(exception: Throwable) {
val context = continuation.context
val state = CompletedExceptionally(exception)
if (dispatcher.isDispatchNeeded(context)) {
_state = CompletedExceptionally(exception)
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled()) {
resumeUndispatchedWithException(exception)
}
}
}
}
@Suppress("NOTHING_TO_INLINE")
inline fun resumeCancelled(): Boolean {
val job = context[Job]
if (job != null && !job.isActive) {
resumeWithException(job.getCancellationException())
return true
}
return false
}
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
inline fun resumeUndispatched(value: T) {
withCoroutineContext(context, countOrElement) {
continuation.resume(value)
}
}
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
inline fun resumeUndispatchedWithException(exception: Throwable) {
withCoroutineContext(context, countOrElement) {
continuation.resumeWithStackTrace(exception)
}
}
// used by "yield" implementation
internal fun dispatchYield(value: T) {
val context = continuation.context
_state = value
resumeMode = MODE_CANCELLABLE
dispatcher.dispatchYield(context, this)
}
override fun toString(): String =
"DispatchedContinuation[$dispatcher, ${continuation.toDebugString()}]"
}
internal fun <T> Continuation<T>.resumeCancellable(value: T) = when (this) {
is DispatchedContinuation -> resumeCancellable(value)
else -> resume(value)
}
internal fun <T> Continuation<T>.resumeCancellableWithException(exception: Throwable) = when (this) {
is DispatchedContinuation -> resumeCancellableWithException(exception)
else -> resumeWithStackTrace(exception)
}
internal fun <T> Continuation<T>.resumeDirect(value: T) = when (this) {
is DispatchedContinuation -> continuation.resume(value)
else -> resume(value)
}
internal fun <T> Continuation<T>.resumeDirectWithException(exception: Throwable) = when (this) {
is DispatchedContinuation -> continuation.resumeWithStackTrace(exception)
else -> resumeWithStackTrace(exception)
}
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
internal abstract val delegate: Continuation<T>
internal abstract fun takeState(): Any?
internal open fun cancelResult(state: Any?, cause: Throwable) {}
@Suppress("UNCHECKED_CAST")
internal open fun <T> getSuccessfulResult(state: Any?): T =
state as T
internal fun getExceptionalResult(state: Any?): Throwable? =
(state as? CompletedExceptionally)?.cause
public final override fun run() {
val taskContext = this.taskContext
var fatalException: Throwable? = null
try {
val delegate = delegate as DispatchedContinuation<T>
val continuation = delegate.continuation
val context = continuation.context
val state = takeState() // NOTE: Must take state in any case, even if cancelled
withCoroutineContext(context, delegate.countOrElement) {
val exception = getExceptionalResult(state)
val job = if (resumeMode.isCancellableMode) context[Job] else null
/*
* Check whether continuation was originally resumed with an exception.
* If so, it dominates cancellation, otherwise the original exception
* will be silently lost.
*/
if (exception == null && job != null && !job.isActive) {
val cause = job.getCancellationException()
cancelResult(state, cause)
continuation.resumeWithStackTrace(cause)
} else {
if (exception != null) continuation.resumeWithStackTrace(exception)
else continuation.resume(getSuccessfulResult(state))
}
}
} catch (e: Throwable) {
// This instead of runCatching to have nicer stacktrace and debug experience
fatalException = e
} finally {
val result = runCatching { taskContext.afterTask() }
handleFatalException(fatalException, result.exceptionOrNull())
}
}
/**
* Machinery that handles fatal exceptions in kotlinx.coroutines.
* There are two kinds of fatal exceptions:
*
* 1) Exceptions from kotlinx.coroutines code. Such exceptions indicate that either
* the library or the compiler has a bug that breaks internal invariants.
* They usually have specific workarounds, but require careful study of the cause and should
* be reported to the maintainers and fixed on the library's side anyway.
*
* 2) Exceptions from [ThreadContextElement.updateThreadContext] and [ThreadContextElement.restoreThreadContext].
* While a user code can trigger such exception by providing an improper implementation of [ThreadContextElement],
* we can't ignore it because it may leave coroutine in the inconsistent state.
* If you encounter such exception, you can either disable this context element or wrap it into
* another context element that catches all exceptions and handles it in the application specific manner.
*
* Fatal exception handling can be intercepted with [CoroutineExceptionHandler] element in the context of
* a failed coroutine, but such exceptions should be reported anyway.
*/
internal fun handleFatalException(exception: Throwable?, finallyException: Throwable?) {
if (exception === null && finallyException === null) return
if (exception !== null && finallyException !== null) {
exception.addSuppressedThrowable(finallyException)
}
val cause = exception ?: finallyException
val reason = CoroutinesInternalError("Fatal exception in coroutines machinery for $this. " +
"Please read KDoc to 'handleFatalException' method and report this incident to maintainers", cause!!)
handleCoroutineException(this.delegate.context, reason)
}
}
internal fun DispatchedContinuation<Unit>.yieldUndispatched(): Boolean =
executeUnconfined(Unit, MODE_CANCELLABLE, doYield = true) {
run()
}
internal fun <T> DispatchedTask<T>.dispatch(mode: Int = MODE_CANCELLABLE) {
val delegate = this.delegate
if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
// dispatch directly using this instance's Runnable implementation
val dispatcher = delegate.dispatcher
val context = delegate.context
if (dispatcher.isDispatchNeeded(context)) {
dispatcher.dispatch(context, this)
} else {
resumeUnconfined()
}
} else {
resume(delegate, mode)
}
}
internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, useMode: Int) {
// slow-path - use delegate
val state = takeState()
val exception = getExceptionalResult(state)
if (exception != null) {
delegate.resumeWithExceptionMode(exception, useMode)
} else {
delegate.resumeMode(getSuccessfulResult(state), useMode)
}
}
@Suppress("NOTHING_TO_INLINE")
internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {
resumeWith(Result.failure(recoverStackTrace(exception, this)))
}