blob: 397d5908a20085cc906e8cb51ada1f4d744380ab [file] [log] [blame]
Roman Elizarov3754f952017-01-18 20:47:54 +03001package kotlinx.coroutines.experimental
2
3import kotlin.coroutines.Continuation
4import kotlin.coroutines.intrinsics.SUSPENDED_MARKER
5import kotlin.coroutines.intrinsics.suspendCoroutineOrReturn
6import kotlin.coroutines.suspendCoroutine
7
8// --------------- cancellable continuations ---------------
9
10/**
11 * Cancellable continuation. Its job is completed when it is resumed or cancelled.
12 * When [cancel] function is explicitly invoked, this continuation resumes with [CancellationException].
13 * If the cancel reason was not a [CancellationException], then the original exception is added as cause of the
14 * [CancellationException] that this continuation resumes with.
15 */
16public interface CancellableContinuation<in T> : Continuation<T>, Job
17
18/**
19 * Suspend coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to
20 * the [block].
21 */
22public inline suspend fun <T> suspendCancellableCoroutine(crossinline block: (CancellableContinuation<T>) -> Unit): T =
23 suspendCoroutineOrReturn { c ->
24 val safe = SafeCancellableContinuation(c)
25 block(safe)
26 safe.getResult()
27 }
28
29// --------------- implementation details ---------------
30
31@PublishedApi
32internal class SafeCancellableContinuation<in T>(
33 private val delegate: Continuation<T>
34) : JobContinuation<T>(delegate.context), CancellableContinuation<T> {
35 // only updated from the thread that invoked suspendCancellableCoroutine
36 private var suspendedThread: Thread? = Thread.currentThread()
37
38 fun getResult(): Any? {
39 if (suspendedThread != null) {
40 suspendedThread = null
41 return SUSPENDED_MARKER
42 }
43 val state = getState()
44 if (state is CompletedExceptionally) throw state.exception
45 return state
46 }
47
48 @Suppress("UNCHECKED_CAST")
49 override fun afterCompletion(state: Any?, closeException: Throwable?) {
50 if (closeException != null) handleCoroutineException(context, closeException)
51 if (suspendedThread === Thread.currentThread()) {
52 // cancelled during suspendCancellableCoroutine in its thread
53 suspendedThread = null
54 } else {
55 // cancelled later or in other thread
56 if (state is CompletedExceptionally)
57 delegate.resumeWithException(state.exception)
58 else
59 delegate.resume(state as T)
60 }
61 }
62}