blob: 5a1c76a71e5b7951258ae7a7582b6058e42b2317 [file] [log] [blame]
Roman Elizarov3754f952017-01-18 20:47:54 +03001package kotlinx.coroutines.experimental
2
3import kotlin.coroutines.Continuation
4import kotlin.coroutines.intrinsics.SUSPENDED_MARKER
5import kotlin.coroutines.intrinsics.suspendCoroutineOrReturn
6import kotlin.coroutines.suspendCoroutine
7
8// --------------- cancellable continuations ---------------
9
10/**
11 * Cancellable continuation. Its job is completed when it is resumed or cancelled.
12 * When [cancel] function is explicitly invoked, this continuation resumes with [CancellationException].
13 * If the cancel reason was not a [CancellationException], then the original exception is added as cause of the
14 * [CancellationException] that this continuation resumes with.
15 */
Roman Elizarov834af462017-01-23 20:50:29 +030016public interface CancellableContinuation<in T> : Continuation<T>, Job {
17 /**
18 * Returns `true` if this continuation was cancelled. It implies that [isActive] is `false`.
19 */
20 val isCancelled: Boolean
21}
Roman Elizarov3754f952017-01-18 20:47:54 +030022
23/**
24 * Suspend coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to
Roman Elizarov58a7add2017-01-20 12:19:52 +030025 * the [block]. This function throws [CancellationException] if the coroutine is cancelled while suspended.
Roman Elizarov3754f952017-01-18 20:47:54 +030026 */
27public inline suspend fun <T> suspendCancellableCoroutine(crossinline block: (CancellableContinuation<T>) -> Unit): T =
Roman Elizarov58a7add2017-01-20 12:19:52 +030028 suspendCoroutineOrReturn { cont ->
29 val safe = SafeCancellableContinuation(cont, getParentJobOrAbort(cont))
Roman Elizarov3754f952017-01-18 20:47:54 +030030 block(safe)
31 safe.getResult()
32 }
33
34// --------------- implementation details ---------------
35
36@PublishedApi
Roman Elizarov58a7add2017-01-20 12:19:52 +030037internal fun getParentJobOrAbort(cont: Continuation<*>): Job? {
38 val job = cont.context[Job]
39 // fast path when parent job is already complete (we don't even construct SafeCancellableContinuation object)
40 job?.isActive?.let { if (!it) throw CancellationException() }
41 return job
42}
43
44@PublishedApi
Roman Elizarov3754f952017-01-18 20:47:54 +030045internal class SafeCancellableContinuation<in T>(
Roman Elizarov58a7add2017-01-20 12:19:52 +030046 private val delegate: Continuation<T>,
47 parentJob: Job?
Roman Elizarov53a0a402017-01-19 20:21:57 +030048) : AbstractCoroutine<T>(delegate.context), CancellableContinuation<T> {
Roman Elizarov3754f952017-01-18 20:47:54 +030049 // only updated from the thread that invoked suspendCancellableCoroutine
50 private var suspendedThread: Thread? = Thread.currentThread()
51
Roman Elizarov58a7add2017-01-20 12:19:52 +030052 init { initParentJob(parentJob) }
53
Roman Elizarov3754f952017-01-18 20:47:54 +030054 fun getResult(): Any? {
55 if (suspendedThread != null) {
56 suspendedThread = null
57 return SUSPENDED_MARKER
58 }
59 val state = getState()
60 if (state is CompletedExceptionally) throw state.exception
61 return state
62 }
63
Roman Elizarov834af462017-01-23 20:50:29 +030064 override val isCancelled: Boolean
65 get() = getState() is Cancelled
66
Roman Elizarov3754f952017-01-18 20:47:54 +030067 @Suppress("UNCHECKED_CAST")
Roman Elizarov53a0a402017-01-19 20:21:57 +030068 override fun afterCompletion(state: Any?) {
Roman Elizarov3754f952017-01-18 20:47:54 +030069 if (suspendedThread === Thread.currentThread()) {
70 // cancelled during suspendCancellableCoroutine in its thread
71 suspendedThread = null
72 } else {
73 // cancelled later or in other thread
74 if (state is CompletedExceptionally)
75 delegate.resumeWithException(state.exception)
76 else
77 delegate.resume(state as T)
78 }
79 }
80}