blob: 2895b4c0127454d403713834611422d1602e0899 [file] [log] [blame]
Roman Elizarov3754f952017-01-18 20:47:54 +03001package kotlinx.coroutines.experimental
2
3import kotlin.coroutines.Continuation
4import kotlin.coroutines.intrinsics.SUSPENDED_MARKER
5import kotlin.coroutines.intrinsics.suspendCoroutineOrReturn
6import kotlin.coroutines.suspendCoroutine
7
8// --------------- cancellable continuations ---------------
9
10/**
11 * Cancellable continuation. Its job is completed when it is resumed or cancelled.
12 * When [cancel] function is explicitly invoked, this continuation resumes with [CancellationException].
13 * If the cancel reason was not a [CancellationException], then the original exception is added as cause of the
14 * [CancellationException] that this continuation resumes with.
15 */
16public interface CancellableContinuation<in T> : Continuation<T>, Job
17
18/**
19 * Suspend coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to
Roman Elizarov58a7add2017-01-20 12:19:52 +030020 * the [block]. This function throws [CancellationException] if the coroutine is cancelled while suspended.
Roman Elizarov3754f952017-01-18 20:47:54 +030021 */
22public inline suspend fun <T> suspendCancellableCoroutine(crossinline block: (CancellableContinuation<T>) -> Unit): T =
Roman Elizarov58a7add2017-01-20 12:19:52 +030023 suspendCoroutineOrReturn { cont ->
24 val safe = SafeCancellableContinuation(cont, getParentJobOrAbort(cont))
Roman Elizarov3754f952017-01-18 20:47:54 +030025 block(safe)
26 safe.getResult()
27 }
28
29// --------------- implementation details ---------------
30
31@PublishedApi
Roman Elizarov58a7add2017-01-20 12:19:52 +030032internal fun getParentJobOrAbort(cont: Continuation<*>): Job? {
33 val job = cont.context[Job]
34 // fast path when parent job is already complete (we don't even construct SafeCancellableContinuation object)
35 job?.isActive?.let { if (!it) throw CancellationException() }
36 return job
37}
38
39@PublishedApi
Roman Elizarov3754f952017-01-18 20:47:54 +030040internal class SafeCancellableContinuation<in T>(
Roman Elizarov58a7add2017-01-20 12:19:52 +030041 private val delegate: Continuation<T>,
42 parentJob: Job?
Roman Elizarov53a0a402017-01-19 20:21:57 +030043) : AbstractCoroutine<T>(delegate.context), CancellableContinuation<T> {
Roman Elizarov3754f952017-01-18 20:47:54 +030044 // only updated from the thread that invoked suspendCancellableCoroutine
45 private var suspendedThread: Thread? = Thread.currentThread()
46
Roman Elizarov58a7add2017-01-20 12:19:52 +030047 init { initParentJob(parentJob) }
48
Roman Elizarov3754f952017-01-18 20:47:54 +030049 fun getResult(): Any? {
50 if (suspendedThread != null) {
51 suspendedThread = null
52 return SUSPENDED_MARKER
53 }
54 val state = getState()
55 if (state is CompletedExceptionally) throw state.exception
56 return state
57 }
58
59 @Suppress("UNCHECKED_CAST")
Roman Elizarov53a0a402017-01-19 20:21:57 +030060 override fun afterCompletion(state: Any?) {
Roman Elizarov3754f952017-01-18 20:47:54 +030061 if (suspendedThread === Thread.currentThread()) {
62 // cancelled during suspendCancellableCoroutine in its thread
63 suspendedThread = null
64 } else {
65 // cancelled later or in other thread
66 if (state is CompletedExceptionally)
67 delegate.resumeWithException(state.exception)
68 else
69 delegate.resume(state as T)
70 }
71 }
72}