blob: 64d815bad787a0b50053be448f9f6bf2adc7ff82 [file] [log] [blame]
Roman Elizarov3754f952017-01-18 20:47:54 +03001package kotlinx.coroutines.experimental
2
Roman Elizarov7cf452e2017-01-29 21:58:33 +03003import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
Roman Elizarovea4a51b2017-01-31 12:01:25 +03004import kotlin.coroutines.experimental.Continuation
5import kotlin.coroutines.experimental.ContinuationInterceptor
6import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
7import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
8import kotlin.coroutines.experimental.suspendCoroutine
Roman Elizarov3754f952017-01-18 20:47:54 +03009
10// --------------- cancellable continuations ---------------
11
12/**
13 * Cancellable continuation. Its job is completed when it is resumed or cancelled.
14 * When [cancel] function is explicitly invoked, this continuation resumes with [CancellationException].
15 * If the cancel reason was not a [CancellationException], then the original exception is added as cause of the
16 * [CancellationException] that this continuation resumes with.
17 */
Roman Elizarov834af462017-01-23 20:50:29 +030018public interface CancellableContinuation<in T> : Continuation<T>, Job {
19 /**
20 * Returns `true` if this continuation was cancelled. It implies that [isActive] is `false`.
21 */
22 val isCancelled: Boolean
23}
Roman Elizarov3754f952017-01-18 20:47:54 +030024
25/**
26 * Suspend coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to
Roman Elizarov58a7add2017-01-20 12:19:52 +030027 * the [block]. This function throws [CancellationException] if the coroutine is cancelled while suspended.
Roman Elizarov3754f952017-01-18 20:47:54 +030028 */
29public inline suspend fun <T> suspendCancellableCoroutine(crossinline block: (CancellableContinuation<T>) -> Unit): T =
Roman Elizarov58a7add2017-01-20 12:19:52 +030030 suspendCoroutineOrReturn { cont ->
31 val safe = SafeCancellableContinuation(cont, getParentJobOrAbort(cont))
Roman Elizarov3754f952017-01-18 20:47:54 +030032 block(safe)
33 safe.getResult()
34 }
35
36// --------------- implementation details ---------------
37
38@PublishedApi
Roman Elizarov58a7add2017-01-20 12:19:52 +030039internal fun getParentJobOrAbort(cont: Continuation<*>): Job? {
40 val job = cont.context[Job]
41 // fast path when parent job is already complete (we don't even construct SafeCancellableContinuation object)
Roman Elizarov7cf452e2017-01-29 21:58:33 +030042 if (job != null && !job.isActive) throw job.getInactiveCancellationException()
Roman Elizarov58a7add2017-01-20 12:19:52 +030043 return job
44}
45
46@PublishedApi
Roman Elizarov3754f952017-01-18 20:47:54 +030047internal class SafeCancellableContinuation<in T>(
Roman Elizarov58a7add2017-01-20 12:19:52 +030048 private val delegate: Continuation<T>,
Roman Elizarov7cf452e2017-01-29 21:58:33 +030049 private val parentJob: Job?
Roman Elizarov53a0a402017-01-19 20:21:57 +030050) : AbstractCoroutine<T>(delegate.context), CancellableContinuation<T> {
Roman Elizarov3754f952017-01-18 20:47:54 +030051 // only updated from the thread that invoked suspendCancellableCoroutine
Roman Elizarov7cf452e2017-01-29 21:58:33 +030052
53 @Volatile
54 private var decision = UNDECIDED
55
56 private companion object {
57 val DECISION: AtomicIntegerFieldUpdater<SafeCancellableContinuation<*>> =
58 AtomicIntegerFieldUpdater.newUpdater(SafeCancellableContinuation::class.java, "decision")
59
60 const val UNDECIDED = 0
61 const val SUSPENDED = 1
62 const val RESUMED = 2
63 const val YIELD = 3 // used by cancellable "yield"
64 }
Roman Elizarov3754f952017-01-18 20:47:54 +030065
Roman Elizarov58a7add2017-01-20 12:19:52 +030066 init { initParentJob(parentJob) }
67
Roman Elizarov3754f952017-01-18 20:47:54 +030068 fun getResult(): Any? {
Roman Elizarov7cf452e2017-01-29 21:58:33 +030069 val decision = this.decision // volatile read
70 when (decision) {
Roman Elizarovea4a51b2017-01-31 12:01:25 +030071 UNDECIDED -> if (DECISION.compareAndSet(this, UNDECIDED, SUSPENDED)) return COROUTINE_SUSPENDED
72 YIELD -> return COROUTINE_SUSPENDED
Roman Elizarov3754f952017-01-18 20:47:54 +030073 }
Roman Elizarov7cf452e2017-01-29 21:58:33 +030074 // otherwise, afterCompletion was already invoked, and the result is in the state
Roman Elizarov3754f952017-01-18 20:47:54 +030075 val state = getState()
76 if (state is CompletedExceptionally) throw state.exception
77 return state
78 }
79
Roman Elizarov834af462017-01-23 20:50:29 +030080 override val isCancelled: Boolean
81 get() = getState() is Cancelled
82
Roman Elizarov3754f952017-01-18 20:47:54 +030083 @Suppress("UNCHECKED_CAST")
Roman Elizarov53a0a402017-01-19 20:21:57 +030084 override fun afterCompletion(state: Any?) {
Roman Elizarov7cf452e2017-01-29 21:58:33 +030085 val decision = this.decision // volatile read
86 if (decision == UNDECIDED && DECISION.compareAndSet(this, UNDECIDED, RESUMED)) return // will get result in getResult
87 // otherwise, getResult has already commenced, i.e. it was resumed later or in other thread
88 if (state is CompletedExceptionally)
89 delegate.resumeWithException(state.exception)
90 else if (decision == YIELD && delegate is DispatchedContinuation)
91 delegate.resumeYield(parentJob, state as T)
92 else
93 delegate.resume(state as T)
94 }
95
96 // can only be invoked in the same thread as getResult (see "yield"), afterCompletion may be concurrent
97 fun resumeYield(value: T) {
98 if ((context[ContinuationInterceptor] as? CoroutineDispatcher)?.isDispatchNeeded(context) == true)
99 DECISION.compareAndSet(this, UNDECIDED, YIELD) // try mark as needing dispatch
100 resume(value)
Roman Elizarov3754f952017-01-18 20:47:54 +0300101 }
102}