| /* |
| * Copyright 2016-2017 JetBrains s.r.o. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package kotlinx.coroutines.experimental |
| |
| import kotlin.coroutines.experimental.Continuation |
| import kotlin.coroutines.experimental.CoroutineContext |
| import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn |
| import kotlin.coroutines.experimental.suspendCoroutine |
| |
| // --------------- cancellable continuations --------------- |
| |
| /** |
| * Cancellable continuation. Its job is _completed_ when it is resumed or cancelled. |
| * When [cancel] function is explicitly invoked, this continuation immediately resumes with [CancellationException] or |
| * with the specified cancel cause. |
| * |
| * Cancellable continuation has three states (as subset of [Job] states): |
| * |
| * | **State** | [isActive] | [isCompleted] | [isCancelled] | |
| * | ----------------------------------- | ---------- | ------------- | ------------- | |
| * | _Active_ (initial state) | `true` | `false` | `false` | |
| * | _Resumed_ (final _completed_ state) | `false` | `true` | `false` | |
| * | _Canceled_ (final _completed_ state)| `false` | `true` | `true` | |
| * |
| * Invocation of [cancel] transitions this continuation from _active_ to _cancelled_ state, while |
| * invocation of [resume] or [resumeWithException] transitions it from _active_ to _resumed_ state. |
| * |
| * A [cancelled][isCancelled] continuation implies that it is [completed][isCompleted]. |
| * |
| * Invocation of [resume] or [resumeWithException] in _resumed_ state produces [IllegalStateException] |
| * but is ignored in _cancelled_ state. |
| * |
| * ``` |
| * +-----------+ resume +---------+ |
| * | Active | ----------> | Resumed | |
| * +-----------+ +---------+ |
| * | |
| * | cancel |
| * V |
| * +-----------+ |
| * | Cancelled | |
| * +-----------+ |
| * |
| * ``` |
| */ |
| public actual interface CancellableContinuation<in T> : Continuation<T> { |
| /** |
| * Returns `true` when this continuation is active -- it has not completed or cancelled yet. |
| */ |
| public actual val isActive: Boolean |
| |
| /** |
| * Returns `true` when this continuation has completed for any reason. A continuation |
| * that was cancelled is also considered complete. |
| */ |
| public actual val isCompleted: Boolean |
| |
| /** |
| * Returns `true` if this continuation was [cancelled][cancel]. |
| * |
| * It implies that [isActive] is `false` and [isCompleted] is `true`. |
| */ |
| public actual val isCancelled: Boolean |
| |
| /** |
| * Tries to resume this continuation with a given value and returns non-null object token if it was successful, |
| * or `null` otherwise (it was already resumed or cancelled). When non-null object was returned, |
| * [completeResume] must be invoked with it. |
| * |
| * When [idempotent] is not `null`, this function performs _idempotent_ operation, so that |
| * further invocations with the same non-null reference produce the same result. |
| * |
| * @suppress **This is unstable API and it is subject to change.** |
| */ |
| public actual fun tryResume(value: T, idempotent: Any? = null): Any? |
| |
| /** |
| * Tries to resume this continuation with a given exception and returns non-null object token if it was successful, |
| * or `null` otherwise (it was already resumed or cancelled). When non-null object was returned, |
| * [completeResume] must be invoked with it. |
| * |
| * @suppress **This is unstable API and it is subject to change.** |
| */ |
| public fun tryResumeWithException(exception: Throwable): Any? |
| |
| /** |
| * Completes the execution of [tryResume] or [tryResumeWithException] on its non-null result. |
| * |
| * @suppress **This is unstable API and it is subject to change.** |
| */ |
| public actual fun completeResume(token: Any) |
| |
| /** |
| * Makes this continuation cancellable. Use it with `holdCancellability` optional parameter to |
| * [suspendCancellableCoroutine] function. It throws [IllegalStateException] if invoked more than once. |
| */ |
| public actual fun initCancellability() |
| |
| /** |
| * Cancels this continuation with an optional cancellation [cause]. The result is `true` if this continuation was |
| * cancelled as a result of this invocation and `false` otherwise. |
| */ |
| public actual fun cancel(cause: Throwable? = null): Boolean |
| |
| /** |
| * Registers handler that is **synchronously** invoked once on completion of this continuation. |
| * When continuation is already complete, then the handler is immediately invoked |
| * with continuation's exception or `null`. Otherwise, handler will be invoked once when this |
| * continuation is complete. |
| * |
| * The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] the |
| * registration of this handler and release its memory if its invocation is no longer needed. |
| * There is no need to dispose the handler after completion of this continuation. The references to |
| * all the handlers are released when this continuation completes. |
| * |
| * Installed [handler] should not throw any exceptions. If it does, they will get caught, |
| * wrapped into [CompletionHandlerException], and rethrown, potentially causing crash of unrelated code. |
| */ |
| public actual fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle |
| |
| /** |
| * Resumes this continuation with a given [value] in the invoker thread without going though |
| * [dispatch][CoroutineDispatcher.dispatch] function of the [CoroutineDispatcher] in the [context]. |
| * This function is designed to be used only by the [CoroutineDispatcher] implementations themselves. |
| * **It should not be used in general code**. |
| */ |
| public actual fun CoroutineDispatcher.resumeUndispatched(value: T) |
| |
| /** |
| * Resumes this continuation with a given [exception] in the invoker thread without going though |
| * [dispatch][CoroutineDispatcher.dispatch] function of the [CoroutineDispatcher] in the [context]. |
| * This function is designed to be used only by the [CoroutineDispatcher] implementations themselves. |
| * **It should not be used in general code**. |
| */ |
| public actual fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable) |
| } |
| |
| /** |
| * Suspends coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to |
| * the [block]. This function throws [CancellationException] if the coroutine is cancelled or completed while suspended. |
| * |
| * If [holdCancellability] optional parameter is `true`, then the coroutine is suspended, but it is not |
| * cancellable until [CancellableContinuation.initCancellability] is invoked. |
| * |
| * See [suspendAtomicCancellableCoroutine] for suspending functions that need *atomic cancellation*. |
| */ |
| public actual inline suspend fun <T> suspendCancellableCoroutine( |
| holdCancellability: Boolean = false, |
| crossinline block: (CancellableContinuation<T>) -> Unit |
| ): T = |
| suspendCoroutineOrReturn { cont -> |
| val cancellable = CancellableContinuationImpl(cont, resumeMode = MODE_CANCELLABLE) |
| if (!holdCancellability) cancellable.initCancellability() |
| block(cancellable) |
| cancellable.getResult() |
| } |
| |
| /** |
| * Suspends coroutine similar to [suspendCancellableCoroutine], but with *atomic cancellation*. |
| * |
| * When suspended function throws [CancellationException] it means that the continuation was not resumed. |
| * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may |
| * continue to execute even after it was cancelled from the same thread in the case when the continuation |
| * was already resumed and was posted for execution to the thread's queue. |
| */ |
| public actual inline suspend fun <T> suspendAtomicCancellableCoroutine( |
| holdCancellability: Boolean = false, |
| crossinline block: (CancellableContinuation<T>) -> Unit |
| ): T = |
| suspendCoroutineOrReturn { cont -> |
| val cancellable = CancellableContinuationImpl(cont, resumeMode = MODE_ATOMIC_DEFAULT) |
| if (!holdCancellability) cancellable.initCancellability() |
| block(cancellable) |
| cancellable.getResult() |
| } |
| |
| // --------------- implementation details --------------- |
| |
| @PublishedApi |
| internal class CancellableContinuationImpl<in T>( |
| delegate: Continuation<T>, |
| resumeMode: Int |
| ) : AbstractContinuation<T>(delegate, resumeMode), CancellableContinuation<T> { |
| @Volatile // just in case -- we don't want an extra data race, even benign one |
| private var _context: CoroutineContext? = null // created on first need |
| |
| public override val context: CoroutineContext |
| get() = _context ?: (delegate.context + this).also { _context = it } |
| |
| override fun initCancellability() { |
| initParentJob(delegate.context[Job]) |
| } |
| |
| override val onCancelMode: Int get() = ON_CANCEL_MAKE_CANCELLED |
| |
| override fun tryResume(value: T, idempotent: Any?): Any? { |
| val state = this.state |
| return when (state) { |
| is Incomplete -> { |
| val update: Any? = if (idempotent == null) value else |
| CompletedIdempotentResult(idempotent, value, state) |
| tryUpdateState(update) |
| state |
| } |
| is CompletedIdempotentResult -> { |
| if (state.idempotentResume === idempotent) { |
| check(state.result === value) { "Non-idempotent resume" } |
| state.token |
| } else |
| null |
| } |
| else -> null // cannot resume -- not active anymore |
| } |
| } |
| |
| override fun tryResumeWithException(exception: Throwable): Any? { |
| val state = this.state |
| return when (state) { |
| is Incomplete -> { |
| tryUpdateState(CompletedExceptionally(exception)) |
| state |
| } |
| else -> null // cannot resume -- not active anymore |
| } |
| } |
| |
| override fun completeResume(token: Any) { |
| completeUpdateState(token as Incomplete, state, resumeMode) |
| } |
| |
| override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle = |
| invokeOnCompletion(onCancelling = false, invokeImmediately = true, handler = handler) |
| |
| override fun CoroutineDispatcher.resumeUndispatched(value: T) { |
| val dc = delegate as? DispatchedContinuation |
| resumeImpl(value, if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode) |
| } |
| |
| override fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable) { |
| val dc = delegate as? DispatchedContinuation |
| resumeImpl(CompletedExceptionally(exception), if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode) |
| } |
| |
| @Suppress("UNCHECKED_CAST") |
| override fun <T> getSuccessfulResult(state: Any?): T = |
| if (state is CompletedIdempotentResult) state.result as T else state as T |
| |
| override fun toString(): String = |
| "CancellableContinuation{${stateString()}}[$delegate]" |
| |
| // todo: This workaround for KT-21968, should be removed in the future |
| public override fun cancel(cause: Throwable?): Boolean = |
| super.cancel(cause) |
| } |
| |
| private class CompletedIdempotentResult( |
| val idempotentResume: Any?, |
| val result: Any?, |
| val token: JobSupport.Incomplete |
| ) { |
| override fun toString(): String = "CompletedIdempotentResult[$result]" |
| } |
| |