blob: 18af647c45633b6954f85201dc30da20d91ef6c8 [file] [log] [blame]
Roman Elizarov3754f952017-01-18 20:47:54 +03001package kotlinx.coroutines.experimental
2
Roman Elizarov7cf452e2017-01-29 21:58:33 +03003import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
Roman Elizarovea4a51b2017-01-31 12:01:25 +03004import kotlin.coroutines.experimental.Continuation
5import kotlin.coroutines.experimental.ContinuationInterceptor
6import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
7import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
8import kotlin.coroutines.experimental.suspendCoroutine
Roman Elizarov3754f952017-01-18 20:47:54 +03009
10// --------------- cancellable continuations ---------------
11
12/**
13 * Cancellable continuation. Its job is completed when it is resumed or cancelled.
14 * When [cancel] function is explicitly invoked, this continuation resumes with [CancellationException].
15 * If the cancel reason was not a [CancellationException], then the original exception is added as cause of the
16 * [CancellationException] that this continuation resumes with.
17 */
Roman Elizarov834af462017-01-23 20:50:29 +030018public interface CancellableContinuation<in T> : Continuation<T>, Job {
19 /**
20 * Returns `true` if this continuation was cancelled. It implies that [isActive] is `false`.
21 */
22 val isCancelled: Boolean
Roman Elizarov187eace2017-01-31 09:39:58 +030023
24 /**
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030025 * Tries to resume this continuation with a given value and returns non-null object token if it was successful,
26 * or `null` otherwise (it was already resumed or cancelled). When non-null object was returned,
27 * [completeResume] must be invoked with it.
Roman Elizarov187eace2017-01-31 09:39:58 +030028 */
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030029 fun tryResume(value: T): Any?
30
31 /**
Roman Elizarovf6fed2a2017-02-03 19:12:09 +030032 * Tries to resume this continuation with a given exception and returns non-null object token if it was successful,
33 * or `null` otherwise (it was already resumed or cancelled). When non-null object was returned,
34 * [completeResume] must be invoked with it.
35 */
36 fun tryResumeWithException(exception: Throwable): Any?
37
38 /**
39 * Completes the execution of [tryResume] or [tryResumeWithException] on its non-null result.
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030040 */
41 fun completeResume(token: Any)
Roman Elizarov187eace2017-01-31 09:39:58 +030042
43 /**
44 * Makes this continuation cancellable. Use it with `holdCancellability` optional parameter to
45 * [suspendCancellableCoroutine] function. It throws [IllegalStateException] if invoked more than once.
46 */
47 fun initCancellability()
Roman Elizarov834af462017-01-23 20:50:29 +030048}
Roman Elizarov3754f952017-01-18 20:47:54 +030049
50/**
51 * Suspend coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to
Roman Elizarov58a7add2017-01-20 12:19:52 +030052 * the [block]. This function throws [CancellationException] if the coroutine is cancelled while suspended.
Roman Elizarov187eace2017-01-31 09:39:58 +030053 *
54 * If [holdCancellability] optional parameter is `true`, then the coroutine is suspended, but it is not
55 * cancellable until [CancellableContinuation.initCancellability] is invoked.
Roman Elizarov3754f952017-01-18 20:47:54 +030056 */
Roman Elizarov187eace2017-01-31 09:39:58 +030057public inline suspend fun <T> suspendCancellableCoroutine(
58 holdCancellability: Boolean = false,
59 crossinline block: (CancellableContinuation<T>) -> Unit
60): T =
Roman Elizarov58a7add2017-01-20 12:19:52 +030061 suspendCoroutineOrReturn { cont ->
62 val safe = SafeCancellableContinuation(cont, getParentJobOrAbort(cont))
Roman Elizarov187eace2017-01-31 09:39:58 +030063 if (!holdCancellability) safe.initCancellability()
Roman Elizarov3754f952017-01-18 20:47:54 +030064 block(safe)
65 safe.getResult()
66 }
67
68// --------------- implementation details ---------------
69
70@PublishedApi
Roman Elizarov58a7add2017-01-20 12:19:52 +030071internal fun getParentJobOrAbort(cont: Continuation<*>): Job? {
72 val job = cont.context[Job]
73 // fast path when parent job is already complete (we don't even construct SafeCancellableContinuation object)
Roman Elizarov7cf452e2017-01-29 21:58:33 +030074 if (job != null && !job.isActive) throw job.getInactiveCancellationException()
Roman Elizarov58a7add2017-01-20 12:19:52 +030075 return job
76}
77
78@PublishedApi
Roman Elizarov3754f952017-01-18 20:47:54 +030079internal class SafeCancellableContinuation<in T>(
Roman Elizarov58a7add2017-01-20 12:19:52 +030080 private val delegate: Continuation<T>,
Roman Elizarov7cf452e2017-01-29 21:58:33 +030081 private val parentJob: Job?
Roman Elizarov53a0a402017-01-19 20:21:57 +030082) : AbstractCoroutine<T>(delegate.context), CancellableContinuation<T> {
Roman Elizarov3754f952017-01-18 20:47:54 +030083 // only updated from the thread that invoked suspendCancellableCoroutine
Roman Elizarov7cf452e2017-01-29 21:58:33 +030084
85 @Volatile
86 private var decision = UNDECIDED
87
88 private companion object {
89 val DECISION: AtomicIntegerFieldUpdater<SafeCancellableContinuation<*>> =
90 AtomicIntegerFieldUpdater.newUpdater(SafeCancellableContinuation::class.java, "decision")
91
92 const val UNDECIDED = 0
93 const val SUSPENDED = 1
94 const val RESUMED = 2
95 const val YIELD = 3 // used by cancellable "yield"
96 }
Roman Elizarov3754f952017-01-18 20:47:54 +030097
Roman Elizarov187eace2017-01-31 09:39:58 +030098 override fun initCancellability() {
99 initParentJob(parentJob)
100 }
Roman Elizarov58a7add2017-01-20 12:19:52 +0300101
Roman Elizarov3754f952017-01-18 20:47:54 +0300102 fun getResult(): Any? {
Roman Elizarov7cf452e2017-01-29 21:58:33 +0300103 val decision = this.decision // volatile read
104 when (decision) {
Roman Elizarovea4a51b2017-01-31 12:01:25 +0300105 UNDECIDED -> if (DECISION.compareAndSet(this, UNDECIDED, SUSPENDED)) return COROUTINE_SUSPENDED
106 YIELD -> return COROUTINE_SUSPENDED
Roman Elizarov3754f952017-01-18 20:47:54 +0300107 }
Roman Elizarov7cf452e2017-01-29 21:58:33 +0300108 // otherwise, afterCompletion was already invoked, and the result is in the state
Roman Elizarov3754f952017-01-18 20:47:54 +0300109 val state = getState()
110 if (state is CompletedExceptionally) throw state.exception
111 return state
112 }
113
Roman Elizarov834af462017-01-23 20:50:29 +0300114 override val isCancelled: Boolean
115 get() = getState() is Cancelled
116
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300117 override fun tryResume(value: T): Any? {
Roman Elizarov187eace2017-01-31 09:39:58 +0300118 while (true) { // lock-free loop on state
119 val state = getState() // atomic read
120 when (state) {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300121 is Active -> if (tryUpdateState(state, value)) return state
122 else -> return null // cannot resume -- not active anymore
Roman Elizarov187eace2017-01-31 09:39:58 +0300123 }
124 }
125 }
126
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300127 override fun tryResumeWithException(exception: Throwable): Any? {
128 while (true) { // lock-free loop on state
129 val state = getState() // atomic read
130 when (state) {
131 is Active -> if (tryUpdateState(state, Failed(exception))) return state
132 else -> return null // cannot resume -- not active anymore
133 }
134 }
135 }
136
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300137 override fun completeResume(token: Any) {
138 completeUpdateState(token, getState())
139 }
140
Roman Elizarov3754f952017-01-18 20:47:54 +0300141 @Suppress("UNCHECKED_CAST")
Roman Elizarov53a0a402017-01-19 20:21:57 +0300142 override fun afterCompletion(state: Any?) {
Roman Elizarov7cf452e2017-01-29 21:58:33 +0300143 val decision = this.decision // volatile read
144 if (decision == UNDECIDED && DECISION.compareAndSet(this, UNDECIDED, RESUMED)) return // will get result in getResult
145 // otherwise, getResult has already commenced, i.e. it was resumed later or in other thread
146 if (state is CompletedExceptionally)
147 delegate.resumeWithException(state.exception)
148 else if (decision == YIELD && delegate is DispatchedContinuation)
149 delegate.resumeYield(parentJob, state as T)
150 else
151 delegate.resume(state as T)
152 }
153
154 // can only be invoked in the same thread as getResult (see "yield"), afterCompletion may be concurrent
155 fun resumeYield(value: T) {
156 if ((context[ContinuationInterceptor] as? CoroutineDispatcher)?.isDispatchNeeded(context) == true)
157 DECISION.compareAndSet(this, UNDECIDED, YIELD) // try mark as needing dispatch
158 resume(value)
Roman Elizarov3754f952017-01-18 20:47:54 +0300159 }
160}