| /* |
| * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
| */ |
| |
| @file:JvmMultifileClass |
| @file:JvmName("BuildersKt") |
| @file:OptIn(ExperimentalContracts::class) |
| |
| package kotlinx.coroutines |
| |
| import kotlinx.atomicfu.* |
| import kotlinx.coroutines.internal.* |
| import kotlinx.coroutines.intrinsics.* |
| import kotlinx.coroutines.selects.* |
| import kotlin.contracts.* |
| import kotlin.coroutines.* |
| import kotlin.coroutines.intrinsics.* |
| import kotlin.jvm.* |
| |
| // --------------- launch --------------- |
| |
| /** |
| * Launches a new coroutine without blocking the current thread and returns a reference to the coroutine as a [Job]. |
| * The coroutine is cancelled when the resulting job is [cancelled][Job.cancel]. |
| * |
| * The coroutine context is inherited from a [CoroutineScope]. Additional context elements can be specified with [context] argument. |
| * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. |
| * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden |
| * with a corresponding [context] element. |
| * |
| * By default, the coroutine is immediately scheduled for execution. |
| * Other start options can be specified via `start` parameter. See [CoroutineStart] for details. |
| * An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case, |
| * the coroutine [Job] is created in _new_ state. It can be explicitly started with [start][Job.start] function |
| * and will be started implicitly on the first invocation of [join][Job.join]. |
| * |
| * Uncaught exceptions in this coroutine cancel the parent job in the context by default |
| * (unless [CoroutineExceptionHandler] is explicitly specified), which means that when `launch` is used with |
| * the context of another coroutine, then any uncaught exception leads to the cancellation of the parent coroutine. |
| * |
| * See [newCoroutineContext] for a description of debugging facilities that are available for a newly created coroutine. |
| * |
| * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine. |
| * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT]. |
| * @param block the coroutine code which will be invoked in the context of the provided scope. |
| **/ |
| public fun CoroutineScope.launch( |
| context: CoroutineContext = EmptyCoroutineContext, |
| start: CoroutineStart = CoroutineStart.DEFAULT, |
| block: suspend CoroutineScope.() -> Unit |
| ): Job { |
| val newContext = newCoroutineContext(context) |
| val coroutine = if (start.isLazy) |
| LazyStandaloneCoroutine(newContext, block) else |
| StandaloneCoroutine(newContext, active = true) |
| coroutine.start(start, coroutine, block) |
| return coroutine |
| } |
| |
| // --------------- async --------------- |
| |
| /** |
| * Creates a coroutine and returns its future result as an implementation of [Deferred]. |
| * The running coroutine is cancelled when the resulting deferred is [cancelled][Job.cancel]. |
| * The resulting coroutine has a key difference compared with similar primitives in other languages |
| * and frameworks: it cancels the parent job (or outer scope) on failure to enforce *structured concurrency* paradigm. |
| * To change that behaviour, supervising parent ([SupervisorJob] or [supervisorScope]) can be used. |
| * |
| * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument. |
| * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. |
| * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden |
| * with corresponding [context] element. |
| * |
| * By default, the coroutine is immediately scheduled for execution. |
| * Other options can be specified via `start` parameter. See [CoroutineStart] for details. |
| * An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case, |
| * the resulting [Deferred] is created in _new_ state. It can be explicitly started with [start][Job.start] |
| * function and will be started implicitly on the first invocation of [join][Job.join], [await][Deferred.await] or [awaitAll]. |
| * |
| * @param block the coroutine code. |
| */ |
| public fun <T> CoroutineScope.async( |
| context: CoroutineContext = EmptyCoroutineContext, |
| start: CoroutineStart = CoroutineStart.DEFAULT, |
| block: suspend CoroutineScope.() -> T |
| ): Deferred<T> { |
| val newContext = newCoroutineContext(context) |
| val coroutine = if (start.isLazy) |
| LazyDeferredCoroutine(newContext, block) else |
| DeferredCoroutine<T>(newContext, active = true) |
| coroutine.start(start, coroutine, block) |
| return coroutine |
| } |
| |
| @Suppress("UNCHECKED_CAST") |
| private open class DeferredCoroutine<T>( |
| parentContext: CoroutineContext, |
| active: Boolean |
| ) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> { |
| override fun getCompleted(): T = getCompletedInternal() as T |
| override suspend fun await(): T = awaitInternal() as T |
| override val onAwait: SelectClause1<T> get() = this |
| override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (T) -> R) = |
| registerSelectClause1Internal(select, block) |
| } |
| |
| private class LazyDeferredCoroutine<T>( |
| parentContext: CoroutineContext, |
| block: suspend CoroutineScope.() -> T |
| ) : DeferredCoroutine<T>(parentContext, active = false) { |
| private val continuation = block.createCoroutineUnintercepted(this, this) |
| |
| override fun onStart() { |
| continuation.startCoroutineCancellable(this) |
| } |
| } |
| |
| // --------------- withContext --------------- |
| |
| /** |
| * Calls the specified suspending block with a given coroutine context, suspends until it completes, and returns |
| * the result. |
| * |
| * The resulting context for the [block] is derived by merging the current [coroutineContext] with the |
| * specified [context] using `coroutineContext + context` (see [CoroutineContext.plus]). |
| * This suspending function is cancellable. It immediately checks for cancellation of |
| * the resulting context and throws [CancellationException] if it is not [active][CoroutineContext.isActive]. |
| * |
| * This function uses dispatcher from the new context, shifting execution of the [block] into the |
| * different thread if a new dispatcher is specified, and back to the original dispatcher |
| * when it completes. Note that the result of `withContext` invocation is |
| * dispatched into the original context in a cancellable way, which means that if the original [coroutineContext], |
| * in which `withContext` was invoked, is cancelled by the time its dispatcher starts to execute the code, |
| * it discards the result of `withContext` and throws [CancellationException]. |
| */ |
| public suspend fun <T> withContext( |
| context: CoroutineContext, |
| block: suspend CoroutineScope.() -> T |
| ): T { |
| contract { |
| callsInPlace(block, InvocationKind.EXACTLY_ONCE) |
| } |
| return suspendCoroutineUninterceptedOrReturn sc@ { uCont -> |
| // compute new context |
| val oldContext = uCont.context |
| val newContext = oldContext + context |
| // always check for cancellation of new context |
| newContext.checkCompletion() |
| // FAST PATH #1 -- new context is the same as the old one |
| if (newContext === oldContext) { |
| val coroutine = ScopeCoroutine(newContext, uCont) |
| return@sc coroutine.startUndispatchedOrReturn(coroutine, block) |
| } |
| // FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed) |
| // `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher) |
| if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) { |
| val coroutine = UndispatchedCoroutine(newContext, uCont) |
| // There are changes in the context, so this thread needs to be updated |
| withCoroutineContext(newContext, null) { |
| return@sc coroutine.startUndispatchedOrReturn(coroutine, block) |
| } |
| } |
| // SLOW PATH -- use new dispatcher |
| val coroutine = DispatchedCoroutine(newContext, uCont) |
| coroutine.initParentJob() |
| block.startCoroutineCancellable(coroutine, coroutine) |
| coroutine.getResult() |
| } |
| } |
| |
| /** |
| * Calls the specified suspending block with the given [CoroutineDispatcher], suspends until it |
| * completes, and returns the result. |
| * |
| * This inline function calls [withContext]. |
| */ |
| @ExperimentalCoroutinesApi |
| public suspend inline operator fun <T> CoroutineDispatcher.invoke( |
| noinline block: suspend CoroutineScope.() -> T |
| ): T = withContext(this, block) |
| |
| // --------------- implementation --------------- |
| |
| private open class StandaloneCoroutine( |
| parentContext: CoroutineContext, |
| active: Boolean |
| ) : AbstractCoroutine<Unit>(parentContext, active) { |
| override fun handleJobException(exception: Throwable): Boolean { |
| handleCoroutineException(context, exception) |
| return true |
| } |
| } |
| |
| private class LazyStandaloneCoroutine( |
| parentContext: CoroutineContext, |
| block: suspend CoroutineScope.() -> Unit |
| ) : StandaloneCoroutine(parentContext, active = false) { |
| private val continuation = block.createCoroutineUnintercepted(this, this) |
| |
| override fun onStart() { |
| continuation.startCoroutineCancellable(this) |
| } |
| } |
| |
| // Used by withContext when context changes, but dispatcher stays the same |
| private class UndispatchedCoroutine<in T>( |
| context: CoroutineContext, |
| uCont: Continuation<T> |
| ) : ScopeCoroutine<T>(context, uCont) { |
| override fun afterResume(state: Any?) { |
| // resume undispatched -- update context by stay on the same dispatcher |
| val result = recoverResult(state, uCont) |
| withCoroutineContext(uCont.context, null) { |
| uCont.resumeWith(result) |
| } |
| } |
| } |
| |
| private const val UNDECIDED = 0 |
| private const val SUSPENDED = 1 |
| private const val RESUMED = 2 |
| |
| // Used by withContext when context dispatcher changes |
| private class DispatchedCoroutine<in T>( |
| context: CoroutineContext, |
| uCont: Continuation<T> |
| ) : ScopeCoroutine<T>(context, uCont) { |
| // this is copy-and-paste of a decision state machine inside AbstractionContinuation |
| // todo: we may some-how abstract it via inline class |
| private val _decision = atomic(UNDECIDED) |
| |
| private fun trySuspend(): Boolean { |
| _decision.loop { decision -> |
| when (decision) { |
| UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true |
| RESUMED -> return false |
| else -> error("Already suspended") |
| } |
| } |
| } |
| |
| private fun tryResume(): Boolean { |
| _decision.loop { decision -> |
| when (decision) { |
| UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true |
| SUSPENDED -> return false |
| else -> error("Already resumed") |
| } |
| } |
| } |
| |
| override fun afterCompletion(state: Any?) { |
| // Call afterResume from afterCompletion and not vice-versa, because stack-size is more |
| // important for afterResume implementation |
| afterResume(state) |
| } |
| |
| override fun afterResume(state: Any?) { |
| if (tryResume()) return // completed before getResult invocation -- bail out |
| // Resume in a cancellable way because we have to switch back to the original dispatcher |
| uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont)) |
| } |
| |
| fun getResult(): Any? { |
| if (trySuspend()) return COROUTINE_SUSPENDED |
| // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state |
| val state = this.state.unboxState() |
| if (state is CompletedExceptionally) throw state.cause |
| @Suppress("UNCHECKED_CAST") |
| return state as T |
| } |
| } |