blob: 1b1adad55c96bf70758f4953e85cbd75f1133ad6 [file] [log] [blame]
Roman Elizarov3754f952017-01-18 20:47:54 +03001package kotlinx.coroutines.experimental
2
Roman Elizarov7cf452e2017-01-29 21:58:33 +03003import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
Roman Elizarovea4a51b2017-01-31 12:01:25 +03004import kotlin.coroutines.experimental.Continuation
5import kotlin.coroutines.experimental.ContinuationInterceptor
6import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
7import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
8import kotlin.coroutines.experimental.suspendCoroutine
Roman Elizarov3754f952017-01-18 20:47:54 +03009
10// --------------- cancellable continuations ---------------
11
12/**
13 * Cancellable continuation. Its job is completed when it is resumed or cancelled.
14 * When [cancel] function is explicitly invoked, this continuation resumes with [CancellationException].
15 * If the cancel reason was not a [CancellationException], then the original exception is added as cause of the
16 * [CancellationException] that this continuation resumes with.
17 */
Roman Elizarov834af462017-01-23 20:50:29 +030018public interface CancellableContinuation<in T> : Continuation<T>, Job {
19 /**
20 * Returns `true` if this continuation was cancelled. It implies that [isActive] is `false`.
21 */
22 val isCancelled: Boolean
Roman Elizarov187eace2017-01-31 09:39:58 +030023
24 /**
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030025 * Tries to resume this continuation with a given value and returns non-null object token if it was successful,
26 * or `null` otherwise (it was already resumed or cancelled). When non-null object was returned,
27 * [completeResume] must be invoked with it.
Roman Elizarov187eace2017-01-31 09:39:58 +030028 */
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030029 fun tryResume(value: T): Any?
30
31 /**
32 * Completes the execution of [tryResume] on its non-null result.
33 */
34 fun completeResume(token: Any)
Roman Elizarov187eace2017-01-31 09:39:58 +030035
36 /**
37 * Makes this continuation cancellable. Use it with `holdCancellability` optional parameter to
38 * [suspendCancellableCoroutine] function. It throws [IllegalStateException] if invoked more than once.
39 */
40 fun initCancellability()
Roman Elizarov834af462017-01-23 20:50:29 +030041}
Roman Elizarov3754f952017-01-18 20:47:54 +030042
43/**
44 * Suspend coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to
Roman Elizarov58a7add2017-01-20 12:19:52 +030045 * the [block]. This function throws [CancellationException] if the coroutine is cancelled while suspended.
Roman Elizarov187eace2017-01-31 09:39:58 +030046 *
47 * If [holdCancellability] optional parameter is `true`, then the coroutine is suspended, but it is not
48 * cancellable until [CancellableContinuation.initCancellability] is invoked.
Roman Elizarov3754f952017-01-18 20:47:54 +030049 */
Roman Elizarov187eace2017-01-31 09:39:58 +030050public inline suspend fun <T> suspendCancellableCoroutine(
51 holdCancellability: Boolean = false,
52 crossinline block: (CancellableContinuation<T>) -> Unit
53): T =
Roman Elizarov58a7add2017-01-20 12:19:52 +030054 suspendCoroutineOrReturn { cont ->
55 val safe = SafeCancellableContinuation(cont, getParentJobOrAbort(cont))
Roman Elizarov187eace2017-01-31 09:39:58 +030056 if (!holdCancellability) safe.initCancellability()
Roman Elizarov3754f952017-01-18 20:47:54 +030057 block(safe)
58 safe.getResult()
59 }
60
61// --------------- implementation details ---------------
62
63@PublishedApi
Roman Elizarov58a7add2017-01-20 12:19:52 +030064internal fun getParentJobOrAbort(cont: Continuation<*>): Job? {
65 val job = cont.context[Job]
66 // fast path when parent job is already complete (we don't even construct SafeCancellableContinuation object)
Roman Elizarov7cf452e2017-01-29 21:58:33 +030067 if (job != null && !job.isActive) throw job.getInactiveCancellationException()
Roman Elizarov58a7add2017-01-20 12:19:52 +030068 return job
69}
70
71@PublishedApi
Roman Elizarov3754f952017-01-18 20:47:54 +030072internal class SafeCancellableContinuation<in T>(
Roman Elizarov58a7add2017-01-20 12:19:52 +030073 private val delegate: Continuation<T>,
Roman Elizarov7cf452e2017-01-29 21:58:33 +030074 private val parentJob: Job?
Roman Elizarov53a0a402017-01-19 20:21:57 +030075) : AbstractCoroutine<T>(delegate.context), CancellableContinuation<T> {
Roman Elizarov3754f952017-01-18 20:47:54 +030076 // only updated from the thread that invoked suspendCancellableCoroutine
Roman Elizarov7cf452e2017-01-29 21:58:33 +030077
78 @Volatile
79 private var decision = UNDECIDED
80
81 private companion object {
82 val DECISION: AtomicIntegerFieldUpdater<SafeCancellableContinuation<*>> =
83 AtomicIntegerFieldUpdater.newUpdater(SafeCancellableContinuation::class.java, "decision")
84
85 const val UNDECIDED = 0
86 const val SUSPENDED = 1
87 const val RESUMED = 2
88 const val YIELD = 3 // used by cancellable "yield"
89 }
Roman Elizarov3754f952017-01-18 20:47:54 +030090
Roman Elizarov187eace2017-01-31 09:39:58 +030091 override fun initCancellability() {
92 initParentJob(parentJob)
93 }
Roman Elizarov58a7add2017-01-20 12:19:52 +030094
Roman Elizarov3754f952017-01-18 20:47:54 +030095 fun getResult(): Any? {
Roman Elizarov7cf452e2017-01-29 21:58:33 +030096 val decision = this.decision // volatile read
97 when (decision) {
Roman Elizarovea4a51b2017-01-31 12:01:25 +030098 UNDECIDED -> if (DECISION.compareAndSet(this, UNDECIDED, SUSPENDED)) return COROUTINE_SUSPENDED
99 YIELD -> return COROUTINE_SUSPENDED
Roman Elizarov3754f952017-01-18 20:47:54 +0300100 }
Roman Elizarov7cf452e2017-01-29 21:58:33 +0300101 // otherwise, afterCompletion was already invoked, and the result is in the state
Roman Elizarov3754f952017-01-18 20:47:54 +0300102 val state = getState()
103 if (state is CompletedExceptionally) throw state.exception
104 return state
105 }
106
Roman Elizarov834af462017-01-23 20:50:29 +0300107 override val isCancelled: Boolean
108 get() = getState() is Cancelled
109
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300110 override fun tryResume(value: T): Any? {
Roman Elizarov187eace2017-01-31 09:39:58 +0300111 while (true) { // lock-free loop on state
112 val state = getState() // atomic read
113 when (state) {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300114 is Active -> if (tryUpdateState(state, value)) return state
115 else -> return null // cannot resume -- not active anymore
Roman Elizarov187eace2017-01-31 09:39:58 +0300116 }
117 }
118 }
119
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300120 override fun completeResume(token: Any) {
121 completeUpdateState(token, getState())
122 }
123
Roman Elizarov3754f952017-01-18 20:47:54 +0300124 @Suppress("UNCHECKED_CAST")
Roman Elizarov53a0a402017-01-19 20:21:57 +0300125 override fun afterCompletion(state: Any?) {
Roman Elizarov7cf452e2017-01-29 21:58:33 +0300126 val decision = this.decision // volatile read
127 if (decision == UNDECIDED && DECISION.compareAndSet(this, UNDECIDED, RESUMED)) return // will get result in getResult
128 // otherwise, getResult has already commenced, i.e. it was resumed later or in other thread
129 if (state is CompletedExceptionally)
130 delegate.resumeWithException(state.exception)
131 else if (decision == YIELD && delegate is DispatchedContinuation)
132 delegate.resumeYield(parentJob, state as T)
133 else
134 delegate.resume(state as T)
135 }
136
137 // can only be invoked in the same thread as getResult (see "yield"), afterCompletion may be concurrent
138 fun resumeYield(value: T) {
139 if ((context[ContinuationInterceptor] as? CoroutineDispatcher)?.isDispatchNeeded(context) == true)
140 DECISION.compareAndSet(this, UNDECIDED, YIELD) // try mark as needing dispatch
141 resume(value)
Roman Elizarov3754f952017-01-18 20:47:54 +0300142 }
143}