blob: f5c536eaeedfd365d0dbb44b97d5c1308076a37b [file] [log] [blame]
Roman Elizarovf16fd272017-02-07 11:26:00 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Roman Elizarov3754f952017-01-18 20:47:54 +030017package kotlinx.coroutines.experimental
18
Roman Elizarov7cf452e2017-01-29 21:58:33 +030019import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
Roman Elizarovea4a51b2017-01-31 12:01:25 +030020import kotlin.coroutines.experimental.Continuation
21import kotlin.coroutines.experimental.ContinuationInterceptor
22import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
23import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
24import kotlin.coroutines.experimental.suspendCoroutine
Roman Elizarov3754f952017-01-18 20:47:54 +030025
26// --------------- cancellable continuations ---------------
27
28/**
29 * Cancellable continuation. Its job is completed when it is resumed or cancelled.
30 * When [cancel] function is explicitly invoked, this continuation resumes with [CancellationException].
31 * If the cancel reason was not a [CancellationException], then the original exception is added as cause of the
32 * [CancellationException] that this continuation resumes with.
33 */
Roman Elizarov834af462017-01-23 20:50:29 +030034public interface CancellableContinuation<in T> : Continuation<T>, Job {
35 /**
36 * Returns `true` if this continuation was cancelled. It implies that [isActive] is `false`.
37 */
38 val isCancelled: Boolean
Roman Elizarov187eace2017-01-31 09:39:58 +030039
40 /**
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030041 * Tries to resume this continuation with a given value and returns non-null object token if it was successful,
42 * or `null` otherwise (it was already resumed or cancelled). When non-null object was returned,
43 * [completeResume] must be invoked with it.
Roman Elizarov187eace2017-01-31 09:39:58 +030044 */
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030045 fun tryResume(value: T): Any?
46
47 /**
Roman Elizarovf6fed2a2017-02-03 19:12:09 +030048 * Tries to resume this continuation with a given exception and returns non-null object token if it was successful,
49 * or `null` otherwise (it was already resumed or cancelled). When non-null object was returned,
50 * [completeResume] must be invoked with it.
51 */
52 fun tryResumeWithException(exception: Throwable): Any?
53
54 /**
55 * Completes the execution of [tryResume] or [tryResumeWithException] on its non-null result.
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030056 */
57 fun completeResume(token: Any)
Roman Elizarov187eace2017-01-31 09:39:58 +030058
59 /**
60 * Makes this continuation cancellable. Use it with `holdCancellability` optional parameter to
61 * [suspendCancellableCoroutine] function. It throws [IllegalStateException] if invoked more than once.
62 */
63 fun initCancellability()
Roman Elizarov834af462017-01-23 20:50:29 +030064}
Roman Elizarov3754f952017-01-18 20:47:54 +030065
66/**
67 * Suspend coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to
Roman Elizarov58a7add2017-01-20 12:19:52 +030068 * the [block]. This function throws [CancellationException] if the coroutine is cancelled while suspended.
Roman Elizarov187eace2017-01-31 09:39:58 +030069 *
70 * If [holdCancellability] optional parameter is `true`, then the coroutine is suspended, but it is not
71 * cancellable until [CancellableContinuation.initCancellability] is invoked.
Roman Elizarov3754f952017-01-18 20:47:54 +030072 */
Roman Elizarov187eace2017-01-31 09:39:58 +030073public inline suspend fun <T> suspendCancellableCoroutine(
74 holdCancellability: Boolean = false,
75 crossinline block: (CancellableContinuation<T>) -> Unit
76): T =
Roman Elizarov58a7add2017-01-20 12:19:52 +030077 suspendCoroutineOrReturn { cont ->
78 val safe = SafeCancellableContinuation(cont, getParentJobOrAbort(cont))
Roman Elizarov187eace2017-01-31 09:39:58 +030079 if (!holdCancellability) safe.initCancellability()
Roman Elizarov3754f952017-01-18 20:47:54 +030080 block(safe)
81 safe.getResult()
82 }
83
84// --------------- implementation details ---------------
85
86@PublishedApi
Roman Elizarov58a7add2017-01-20 12:19:52 +030087internal fun getParentJobOrAbort(cont: Continuation<*>): Job? {
88 val job = cont.context[Job]
89 // fast path when parent job is already complete (we don't even construct SafeCancellableContinuation object)
Roman Elizarov7cf452e2017-01-29 21:58:33 +030090 if (job != null && !job.isActive) throw job.getInactiveCancellationException()
Roman Elizarov58a7add2017-01-20 12:19:52 +030091 return job
92}
93
94@PublishedApi
Roman Elizarov3754f952017-01-18 20:47:54 +030095internal class SafeCancellableContinuation<in T>(
Roman Elizarov58a7add2017-01-20 12:19:52 +030096 private val delegate: Continuation<T>,
Roman Elizarov7cf452e2017-01-29 21:58:33 +030097 private val parentJob: Job?
Roman Elizarov53a0a402017-01-19 20:21:57 +030098) : AbstractCoroutine<T>(delegate.context), CancellableContinuation<T> {
Roman Elizarov3754f952017-01-18 20:47:54 +030099 // only updated from the thread that invoked suspendCancellableCoroutine
Roman Elizarov7cf452e2017-01-29 21:58:33 +0300100
101 @Volatile
102 private var decision = UNDECIDED
103
104 private companion object {
105 val DECISION: AtomicIntegerFieldUpdater<SafeCancellableContinuation<*>> =
106 AtomicIntegerFieldUpdater.newUpdater(SafeCancellableContinuation::class.java, "decision")
107
108 const val UNDECIDED = 0
109 const val SUSPENDED = 1
110 const val RESUMED = 2
111 const val YIELD = 3 // used by cancellable "yield"
112 }
Roman Elizarov3754f952017-01-18 20:47:54 +0300113
Roman Elizarov187eace2017-01-31 09:39:58 +0300114 override fun initCancellability() {
115 initParentJob(parentJob)
116 }
Roman Elizarov58a7add2017-01-20 12:19:52 +0300117
Roman Elizarov3754f952017-01-18 20:47:54 +0300118 fun getResult(): Any? {
Roman Elizarov7cf452e2017-01-29 21:58:33 +0300119 val decision = this.decision // volatile read
120 when (decision) {
Roman Elizarovea4a51b2017-01-31 12:01:25 +0300121 UNDECIDED -> if (DECISION.compareAndSet(this, UNDECIDED, SUSPENDED)) return COROUTINE_SUSPENDED
122 YIELD -> return COROUTINE_SUSPENDED
Roman Elizarov3754f952017-01-18 20:47:54 +0300123 }
Roman Elizarov7cf452e2017-01-29 21:58:33 +0300124 // otherwise, afterCompletion was already invoked, and the result is in the state
Roman Elizarov3754f952017-01-18 20:47:54 +0300125 val state = getState()
126 if (state is CompletedExceptionally) throw state.exception
127 return state
128 }
129
Roman Elizarov834af462017-01-23 20:50:29 +0300130 override val isCancelled: Boolean
131 get() = getState() is Cancelled
132
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300133 override fun tryResume(value: T): Any? {
Roman Elizarov187eace2017-01-31 09:39:58 +0300134 while (true) { // lock-free loop on state
135 val state = getState() // atomic read
136 when (state) {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300137 is Active -> if (tryUpdateState(state, value)) return state
138 else -> return null // cannot resume -- not active anymore
Roman Elizarov187eace2017-01-31 09:39:58 +0300139 }
140 }
141 }
142
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300143 override fun tryResumeWithException(exception: Throwable): Any? {
144 while (true) { // lock-free loop on state
145 val state = getState() // atomic read
146 when (state) {
147 is Active -> if (tryUpdateState(state, Failed(exception))) return state
148 else -> return null // cannot resume -- not active anymore
149 }
150 }
151 }
152
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300153 override fun completeResume(token: Any) {
154 completeUpdateState(token, getState())
155 }
156
Roman Elizarov3754f952017-01-18 20:47:54 +0300157 @Suppress("UNCHECKED_CAST")
Roman Elizarov53a0a402017-01-19 20:21:57 +0300158 override fun afterCompletion(state: Any?) {
Roman Elizarov7cf452e2017-01-29 21:58:33 +0300159 val decision = this.decision // volatile read
160 if (decision == UNDECIDED && DECISION.compareAndSet(this, UNDECIDED, RESUMED)) return // will get result in getResult
161 // otherwise, getResult has already commenced, i.e. it was resumed later or in other thread
162 if (state is CompletedExceptionally)
163 delegate.resumeWithException(state.exception)
164 else if (decision == YIELD && delegate is DispatchedContinuation)
165 delegate.resumeYield(parentJob, state as T)
166 else
167 delegate.resume(state as T)
168 }
169
170 // can only be invoked in the same thread as getResult (see "yield"), afterCompletion may be concurrent
171 fun resumeYield(value: T) {
172 if ((context[ContinuationInterceptor] as? CoroutineDispatcher)?.isDispatchNeeded(context) == true)
173 DECISION.compareAndSet(this, UNDECIDED, YIELD) // try mark as needing dispatch
174 resume(value)
Roman Elizarov3754f952017-01-18 20:47:54 +0300175 }
176}