blob: 9723585dced1b69968e6f163ad76e38a6a48ee50 [file] [log] [blame]
Roman Elizarov3754f952017-01-18 20:47:54 +03001package kotlinx.coroutines.experimental
2
3import kotlin.coroutines.Continuation
4import kotlin.coroutines.intrinsics.SUSPENDED_MARKER
5import kotlin.coroutines.intrinsics.suspendCoroutineOrReturn
6import kotlin.coroutines.suspendCoroutine
7
8// --------------- cancellable continuations ---------------
9
10/**
11 * Cancellable continuation. Its job is completed when it is resumed or cancelled.
12 * When [cancel] function is explicitly invoked, this continuation resumes with [CancellationException].
13 * If the cancel reason was not a [CancellationException], then the original exception is added as cause of the
14 * [CancellationException] that this continuation resumes with.
15 */
16public interface CancellableContinuation<in T> : Continuation<T>, Job
17
18/**
19 * Suspend coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to
20 * the [block].
21 */
22public inline suspend fun <T> suspendCancellableCoroutine(crossinline block: (CancellableContinuation<T>) -> Unit): T =
23 suspendCoroutineOrReturn { c ->
24 val safe = SafeCancellableContinuation(c)
25 block(safe)
26 safe.getResult()
27 }
28
29// --------------- implementation details ---------------
30
31@PublishedApi
32internal class SafeCancellableContinuation<in T>(
33 private val delegate: Continuation<T>
Roman Elizarov53a0a402017-01-19 20:21:57 +030034) : AbstractCoroutine<T>(delegate.context), CancellableContinuation<T> {
Roman Elizarov3754f952017-01-18 20:47:54 +030035 // only updated from the thread that invoked suspendCancellableCoroutine
36 private var suspendedThread: Thread? = Thread.currentThread()
37
38 fun getResult(): Any? {
39 if (suspendedThread != null) {
40 suspendedThread = null
41 return SUSPENDED_MARKER
42 }
43 val state = getState()
44 if (state is CompletedExceptionally) throw state.exception
45 return state
46 }
47
48 @Suppress("UNCHECKED_CAST")
Roman Elizarov53a0a402017-01-19 20:21:57 +030049 override fun afterCompletion(state: Any?) {
Roman Elizarov3754f952017-01-18 20:47:54 +030050 if (suspendedThread === Thread.currentThread()) {
51 // cancelled during suspendCancellableCoroutine in its thread
52 suspendedThread = null
53 } else {
54 // cancelled later or in other thread
55 if (state is CompletedExceptionally)
56 delegate.resumeWithException(state.exception)
57 else
58 delegate.resume(state as T)
59 }
60 }
61}