blob: 4daf5f10b5913be35882c9b44befcf03fb26fa81 [file] [log] [blame]
Roman Elizarov3754f952017-01-18 20:47:54 +03001package kotlinx.coroutines.experimental
2
Roman Elizarov7cf452e2017-01-29 21:58:33 +03003import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
Roman Elizarovea4a51b2017-01-31 12:01:25 +03004import kotlin.coroutines.experimental.Continuation
5import kotlin.coroutines.experimental.ContinuationInterceptor
6import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
7import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
8import kotlin.coroutines.experimental.suspendCoroutine
Roman Elizarov3754f952017-01-18 20:47:54 +03009
10// --------------- cancellable continuations ---------------
11
12/**
13 * Cancellable continuation. Its job is completed when it is resumed or cancelled.
14 * When [cancel] function is explicitly invoked, this continuation resumes with [CancellationException].
15 * If the cancel reason was not a [CancellationException], then the original exception is added as cause of the
16 * [CancellationException] that this continuation resumes with.
17 */
Roman Elizarov834af462017-01-23 20:50:29 +030018public interface CancellableContinuation<in T> : Continuation<T>, Job {
19 /**
20 * Returns `true` if this continuation was cancelled. It implies that [isActive] is `false`.
21 */
22 val isCancelled: Boolean
Roman Elizarov187eace2017-01-31 09:39:58 +030023
24 /**
25 * Tries to resume this continuation with a given value and returns `true` if it was successful,
26 * or `false` otherwise (it was already resumed or cancelled).
27 *
28 * An optional [onSuccess] callback is invoked with [value] as its parameter after the state of this continuation
29 * is updated (so that is cannot be cancelled anymore), but before it is actually resumed.
30 */
31 fun tryResume(value: T, onSuccess: ((Any?) -> Unit)? = null): Boolean
32
33 /**
34 * Makes this continuation cancellable. Use it with `holdCancellability` optional parameter to
35 * [suspendCancellableCoroutine] function. It throws [IllegalStateException] if invoked more than once.
36 */
37 fun initCancellability()
Roman Elizarov834af462017-01-23 20:50:29 +030038}
Roman Elizarov3754f952017-01-18 20:47:54 +030039
40/**
41 * Suspend coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to
Roman Elizarov58a7add2017-01-20 12:19:52 +030042 * the [block]. This function throws [CancellationException] if the coroutine is cancelled while suspended.
Roman Elizarov187eace2017-01-31 09:39:58 +030043 *
44 * If [holdCancellability] optional parameter is `true`, then the coroutine is suspended, but it is not
45 * cancellable until [CancellableContinuation.initCancellability] is invoked.
Roman Elizarov3754f952017-01-18 20:47:54 +030046 */
Roman Elizarov187eace2017-01-31 09:39:58 +030047public inline suspend fun <T> suspendCancellableCoroutine(
48 holdCancellability: Boolean = false,
49 crossinline block: (CancellableContinuation<T>) -> Unit
50): T =
Roman Elizarov58a7add2017-01-20 12:19:52 +030051 suspendCoroutineOrReturn { cont ->
52 val safe = SafeCancellableContinuation(cont, getParentJobOrAbort(cont))
Roman Elizarov187eace2017-01-31 09:39:58 +030053 if (!holdCancellability) safe.initCancellability()
Roman Elizarov3754f952017-01-18 20:47:54 +030054 block(safe)
55 safe.getResult()
56 }
57
58// --------------- implementation details ---------------
59
60@PublishedApi
Roman Elizarov58a7add2017-01-20 12:19:52 +030061internal fun getParentJobOrAbort(cont: Continuation<*>): Job? {
62 val job = cont.context[Job]
63 // fast path when parent job is already complete (we don't even construct SafeCancellableContinuation object)
Roman Elizarov7cf452e2017-01-29 21:58:33 +030064 if (job != null && !job.isActive) throw job.getInactiveCancellationException()
Roman Elizarov58a7add2017-01-20 12:19:52 +030065 return job
66}
67
68@PublishedApi
Roman Elizarov3754f952017-01-18 20:47:54 +030069internal class SafeCancellableContinuation<in T>(
Roman Elizarov58a7add2017-01-20 12:19:52 +030070 private val delegate: Continuation<T>,
Roman Elizarov7cf452e2017-01-29 21:58:33 +030071 private val parentJob: Job?
Roman Elizarov53a0a402017-01-19 20:21:57 +030072) : AbstractCoroutine<T>(delegate.context), CancellableContinuation<T> {
Roman Elizarov3754f952017-01-18 20:47:54 +030073 // only updated from the thread that invoked suspendCancellableCoroutine
Roman Elizarov7cf452e2017-01-29 21:58:33 +030074
75 @Volatile
76 private var decision = UNDECIDED
77
78 private companion object {
79 val DECISION: AtomicIntegerFieldUpdater<SafeCancellableContinuation<*>> =
80 AtomicIntegerFieldUpdater.newUpdater(SafeCancellableContinuation::class.java, "decision")
81
82 const val UNDECIDED = 0
83 const val SUSPENDED = 1
84 const val RESUMED = 2
85 const val YIELD = 3 // used by cancellable "yield"
86 }
Roman Elizarov3754f952017-01-18 20:47:54 +030087
Roman Elizarov187eace2017-01-31 09:39:58 +030088 override fun initCancellability() {
89 initParentJob(parentJob)
90 }
Roman Elizarov58a7add2017-01-20 12:19:52 +030091
Roman Elizarov3754f952017-01-18 20:47:54 +030092 fun getResult(): Any? {
Roman Elizarov7cf452e2017-01-29 21:58:33 +030093 val decision = this.decision // volatile read
94 when (decision) {
Roman Elizarovea4a51b2017-01-31 12:01:25 +030095 UNDECIDED -> if (DECISION.compareAndSet(this, UNDECIDED, SUSPENDED)) return COROUTINE_SUSPENDED
96 YIELD -> return COROUTINE_SUSPENDED
Roman Elizarov3754f952017-01-18 20:47:54 +030097 }
Roman Elizarov7cf452e2017-01-29 21:58:33 +030098 // otherwise, afterCompletion was already invoked, and the result is in the state
Roman Elizarov3754f952017-01-18 20:47:54 +030099 val state = getState()
100 if (state is CompletedExceptionally) throw state.exception
101 return state
102 }
103
Roman Elizarov834af462017-01-23 20:50:29 +0300104 override val isCancelled: Boolean
105 get() = getState() is Cancelled
106
Roman Elizarov187eace2017-01-31 09:39:58 +0300107 override fun tryResume(value: T, onSuccess: ((Any?) -> Unit)?): Boolean {
108 while (true) { // lock-free loop on state
109 val state = getState() // atomic read
110 when (state) {
111 is Active -> if (updateState(state, value, onSuccess)) return true
112 else -> return false // cannot resume -- not active anymore
113 }
114 }
115 }
116
Roman Elizarov3754f952017-01-18 20:47:54 +0300117 @Suppress("UNCHECKED_CAST")
Roman Elizarov53a0a402017-01-19 20:21:57 +0300118 override fun afterCompletion(state: Any?) {
Roman Elizarov7cf452e2017-01-29 21:58:33 +0300119 val decision = this.decision // volatile read
120 if (decision == UNDECIDED && DECISION.compareAndSet(this, UNDECIDED, RESUMED)) return // will get result in getResult
121 // otherwise, getResult has already commenced, i.e. it was resumed later or in other thread
122 if (state is CompletedExceptionally)
123 delegate.resumeWithException(state.exception)
124 else if (decision == YIELD && delegate is DispatchedContinuation)
125 delegate.resumeYield(parentJob, state as T)
126 else
127 delegate.resume(state as T)
128 }
129
130 // can only be invoked in the same thread as getResult (see "yield"), afterCompletion may be concurrent
131 fun resumeYield(value: T) {
132 if ((context[ContinuationInterceptor] as? CoroutineDispatcher)?.isDispatchNeeded(context) == true)
133 DECISION.compareAndSet(this, UNDECIDED, YIELD) // try mark as needing dispatch
134 resume(value)
Roman Elizarov3754f952017-01-18 20:47:54 +0300135 }
136}