blob: 5e81b2b090f7e3c273a7fcf869f3d63b81764121 [file] [log] [blame]
Roman Elizarov2b12d582017-06-22 20:12:19 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov2b12d582017-06-22 20:12:19 +03003 */
4
5package kotlinx.coroutines.experimental
6
Roman Elizarovaa461cf2018-04-11 13:20:29 +03007import kotlinx.atomicfu.*
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +03008import kotlinx.coroutines.experimental.internalAnnotations.*
Roman Elizarovaa461cf2018-04-11 13:20:29 +03009import kotlin.coroutines.experimental.*
10import kotlin.coroutines.experimental.intrinsics.*
Roman Elizarov7753f8e2017-08-15 11:16:33 +030011
12private const val UNDECIDED = 0
13private const val SUSPENDED = 1
14private const val RESUMED = 2
15
Roman Elizarov2b12d582017-06-22 20:12:19 +030016/**
17 * @suppress **This is unstable API and it is subject to change.**
18 */
19internal abstract class AbstractContinuation<in T>(
Roman Elizarovf2239e12018-01-10 16:25:25 +030020 public final override val delegate: Continuation<T>,
21 public final override val resumeMode: Int
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +030022) : Continuation<T>, DispatchedTask<T> {
23
24 /*
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +030025 * Implementation notes
26 *
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +030027 * AbstractContinuation is a subset of Job with following limitations:
28 * 1) It can have only cancellation listeners
29 * 2) It always invokes cancellation listener if it's cancelled (no 'invokeImmediately')
30 * 3) It can have at most one cancellation listener
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +030031 * 4) Its cancellation listeners cannot be deregistered
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +030032 * As a consequence it has much simpler state machine, more lightweight machinery and
33 * less dependencies.
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +030034 *
35 * Cancelling state
36 * If useCancellingState is true, then this continuation can have additional cancelling state,
37 * which is transition from Active to Cancelled. This is specific state to support withContext(ctx)
38 * construction: block in withContext can be cancelled from withing or even before stepping into withContext,
39 * but we still want to properly run it (e.g. when it has atomic cancellation mode) and run its completion listener
40 * after.
41 * During cancellation all pending exceptions are aggregated and thrown during transition to final state
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +030042 */
Roman Elizarov2b12d582017-06-22 20:12:19 +030043
44 /* decision state machine
45
46 +-----------+ trySuspend +-----------+
47 | UNDECIDED | -------------> | SUSPENDED |
48 +-----------+ +-----------+
49 |
50 | tryResume
51 V
52 +-----------+
53 | RESUMED |
54 +-----------+
55
56 Note: both tryResume and trySuspend can be invoked at most once, first invocation wins
57 */
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +030058 private val _decision = atomic(UNDECIDED)
59
60 /*
61 === Internal states ===
62 name state class public state description
63 ------ ------------ ------------ -----------
64 ACTIVE Active : Active active, no listeners
Roman Elizarovdbd9e1c2018-04-28 15:14:18 +030065 SINGLE_A CancelHandler : Active active, one cancellation listener
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +030066 CANCELLING Cancelling : Active in the process of cancellation due to cancellation of parent job
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +030067 CANCELLED Cancelled : Cancelled cancelled (final state)
68 COMPLETED any : Completed produced some result or threw an exception (final state)
69 */
70 private val _state = atomic<Any?>(ACTIVE)
71
72 @Volatile
73 private var parentHandle: DisposableHandle? = null
74
75 internal val state: Any? get() = _state.value
76
77 public val isActive: Boolean get() = state is NotCompleted
78
79 public val isCompleted: Boolean get() = state !is NotCompleted
80
81 public val isCancelled: Boolean get() = state is CancelledContinuation
82
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +030083 protected open val useCancellingState: Boolean get() = false
84
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +030085 internal fun initParentJobInternal(parent: Job?) {
86 check(parentHandle == null)
87 if (parent == null) {
88 parentHandle = NonDisposableHandle
89 return
90 }
91 parent.start() // make sure the parent is started
Roman Elizarov6d9f40f2018-04-28 14:44:02 +030092 val handle = parent.invokeOnCompletion(onCancelling = true,
93 handler = ChildContinuation(parent, this).asHandler)
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +030094
95 parentHandle = handle
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +030096 // now check our state _after_ registering (see updateStateToFinal order of actions)
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +030097 if (isCompleted) {
98 handle.dispose()
99 parentHandle = NonDisposableHandle // release it just in case, to aid GC
100 }
101 }
Roman Elizarov2b12d582017-06-22 20:12:19 +0300102
Roman Elizarovf2239e12018-01-10 16:25:25 +0300103 override fun takeState(): Any? = state
104
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300105 public fun cancel(cause: Throwable?): Boolean {
106 loopOnState { state ->
107 if (state !is NotCompleted) return false // quit if already complete
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300108 if (state is Cancelling) return false // someone else succeeded
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300109 if (tryCancel(state, cause)) return true
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300110 }
111 }
112
Roman Elizarovbcdd8e12017-10-20 16:42:06 +0800113 private fun trySuspend(): Boolean {
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300114 _decision.loop { decision ->
Roman Elizarov2b12d582017-06-22 20:12:19 +0300115 when (decision) {
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300116 UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
Roman Elizarov2b12d582017-06-22 20:12:19 +0300117 RESUMED -> return false
118 else -> error("Already suspended")
119 }
120 }
121 }
122
Roman Elizarovbcdd8e12017-10-20 16:42:06 +0800123 private fun tryResume(): Boolean {
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300124 _decision.loop { decision ->
Roman Elizarov2b12d582017-06-22 20:12:19 +0300125 when (decision) {
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300126 UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
Roman Elizarov2b12d582017-06-22 20:12:19 +0300127 SUSPENDED -> return false
128 else -> error("Already resumed")
129 }
130 }
131 }
132
Roman Elizarovbcdd8e12017-10-20 16:42:06 +0800133 @PublishedApi
134 internal fun getResult(): Any? {
135 if (trySuspend()) return COROUTINE_SUSPENDED
Roman Elizarovebc88662018-01-24 23:58:56 +0300136 // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
Roman Elizarovbcdd8e12017-10-20 16:42:06 +0800137 val state = this.state
Vsevolod Tolstopyatovc1092d52018-04-12 20:22:25 +0300138 if (state is CompletedExceptionally) throw state.cause
Roman Elizarovbcdd8e12017-10-20 16:42:06 +0800139 return getSuccessfulResult(state)
140 }
Roman Elizarov2b12d582017-06-22 20:12:19 +0300141
Roman Elizarovbcdd8e12017-10-20 16:42:06 +0800142 override fun resume(value: T) =
143 resumeImpl(value, resumeMode)
144
145 override fun resumeWithException(exception: Throwable) =
146 resumeImpl(CompletedExceptionally(exception), resumeMode)
147
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300148 public fun invokeOnCancellation(handler: CompletionHandler) {
Roman Elizarovdbd9e1c2018-04-28 15:14:18 +0300149 var handleCache: CancelHandler? = null
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300150 loopOnState { state ->
Roman Elizarov2b12d582017-06-22 20:12:19 +0300151 when (state) {
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300152 is Active -> {
153 val node = handleCache ?: makeHandler(handler).also { handleCache = it }
154 if (_state.compareAndSet(state, node)) {
155 return
156 }
Roman Elizarov2b12d582017-06-22 20:12:19 +0300157 }
Roman Elizarovdbd9e1c2018-04-28 15:14:18 +0300158 is CancelHandler -> error("It's prohibited to register multiple handlers, tried to register $handler, already has $state")
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300159 is CancelledContinuation -> {
160 /*
161 * Continuation is complete, invoke directly.
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300162 * NOTE: multiple invokeOnCancellation calls with different handlers are allowed on cancelled continuation.
163 * It's inconsistent with running continuation, but currently, we have no mechanism to check
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300164 * whether any handler was registered during continuation lifecycle without additional overhead.
165 * This may be changed in the future.
166 *
167 * :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
168 * because we play type tricks on Kotlin/JS and handler is not necessarily a function there
169 */
170 handler.invokeIt((state as? CompletedExceptionally)?.cause)
Roman Elizarov2b12d582017-06-22 20:12:19 +0300171 return
172 }
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300173 is Cancelling -> error("Cancellation handlers for continuations with 'Cancelling' state are not supported")
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300174 else -> return
Roman Elizarov2b12d582017-06-22 20:12:19 +0300175 }
176 }
177 }
178
Roman Elizarovdbd9e1c2018-04-28 15:14:18 +0300179 private fun makeHandler(handler: CompletionHandler): CancelHandler =
180 if (handler is CancelHandler) handler else InvokeOnCancel(handler)
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300181
182 private fun tryCancel(state: NotCompleted, cause: Throwable?): Boolean {
183 if (useCancellingState) {
Roman Elizarovdbd9e1c2018-04-28 15:14:18 +0300184 require(state !is CancelHandler) { "Invariant: 'Cancelling' state and cancellation handlers cannot be used together" }
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300185 return _state.compareAndSet(state, Cancelling(CancelledContinuation(this, cause)))
186 }
187
188 return updateStateToFinal(state, CancelledContinuation(this, cause), mode = MODE_ATOMIC_DEFAULT)
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300189 }
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300190
191 private fun onCompletionInternal(mode: Int) {
192 if (tryResume()) return // completed before getResult invocation -- bail out
193 // otherwise, getResult has already commenced, i.e. completed later or in other thread
194 dispatch(mode)
195 }
196
197 protected inline fun loopOnState(block: (Any?) -> Unit): Nothing {
198 while (true) {
199 block(state)
200 }
201 }
202
203 protected fun resumeImpl(proposedUpdate: Any?, resumeMode: Int) {
204 loopOnState { state ->
205 when (state) {
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300206 is Cancelling -> { // withContext() support
207 /*
208 * If already cancelled block is resumed with non-exception,
209 * resume it with cancellation exception.
210 * E.g.
211 * ```
212 * val value = withContext(ctx) {
213 * outerJob.cancel() // -> cancelling
214 * 42 // -> cancelled
215 * }
216 * ```
217 * should throw cancellation exception instead of returning 42
218 */
219 if (proposedUpdate !is CompletedExceptionally) {
220 val update = state.cancel
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300221 if (updateStateToFinal(state, update, resumeMode)) return
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300222 } else {
223 /*
224 * If already cancelled block is resumed with an exception,
Vsevolod Tolstopyatovf2b4e0e2018-05-25 18:58:01 +0300225 * then we should properly merge them to avoid information loss.
226 *
227 * General rule:
228 * Thrown exception always becomes a result and cancellation reason
229 * is added to suppressed exceptions if necessary.
230 * Basic duplicate/cycles check is performed
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300231 */
232 val update: CompletedExceptionally
233
234 /*
235 * Proposed update is another CancellationException.
236 * e.g.
237 * ```
238 * T1: ctxJob.cancel(e1) // -> cancelling
239 * T2:
Vsevolod Tolstopyatovf2b4e0e2018-05-25 18:58:01 +0300240 * withContext(ctx, Mode.ATOMIC) {
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300241 * // -> resumed with cancellation exception
242 * }
243 * ```
244 */
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300245 if (proposedUpdate.cause is CancellationException) {
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300246 // Keep original cancellation cause and try add to suppressed exception from proposed cancel
Vsevolod Tolstopyatovf2b4e0e2018-05-25 18:58:01 +0300247 update = proposedUpdate
248 coerceWithException(state, update)
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300249 } else {
250 /*
251 * Proposed update is exception => transition to terminal state
252 * E.g.
253 * ```
254 * withContext(ctx) {
255 * outerJob.cancel() // -> cancelling
256 * throw Exception() // -> completed exceptionally
257 * }
258 * ```
259 */
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300260 val exception = proposedUpdate.cause
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300261 val currentException = state.cancel.cause
262 // Add to suppressed if original cancellation differs from proposed exception
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300263 if (currentException !is CancellationException || currentException.cause !== exception) {
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300264 exception.addSuppressedThrowable(currentException)
265 }
266
267 update = CompletedExceptionally(exception)
268 }
269
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300270 if (updateStateToFinal(state, update, resumeMode)) {
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300271 return
272 }
273 }
274 }
275
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300276 is NotCompleted -> {
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300277 if (updateStateToFinal(state, proposedUpdate, resumeMode)) return
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300278 }
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300279 is CancelledContinuation -> {
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300280 if (proposedUpdate is NotCompleted || proposedUpdate is CompletedExceptionally) {
281 error("Unexpected update, state: $state, update: $proposedUpdate")
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300282 }
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300283 // Coroutine is dispatched normally (e.g.via `delay()`) after cancellation
284 return
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300285 }
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300286 else -> error("Already resumed, but proposed with update $proposedUpdate")
287 }
288 }
289 }
290
Vsevolod Tolstopyatovf2b4e0e2018-05-25 18:58:01 +0300291 // Coerce current cancelling state with proposed exception
292 private fun coerceWithException(state: Cancelling, proposedUpdate: CompletedExceptionally) {
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300293 val originalCancellation = state.cancel
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300294 val originalException = originalCancellation.cause
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300295 val updateCause = proposedUpdate.cause
Vsevolod Tolstopyatovf2b4e0e2018-05-25 18:58:01 +0300296 // Cause of proposed update is present and differs from one in current state
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300297 val isSameCancellation = originalCancellation.cause is CancellationException
298 && originalException.cause === updateCause.cause
Vsevolod Tolstopyatovf2b4e0e2018-05-25 18:58:01 +0300299 if (!isSameCancellation && (originalException.cause !== updateCause)) {
300 proposedUpdate.cause.addSuppressedThrowable(originalException)
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300301 }
302 }
303
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300304 /**
305 * Tries to make transition from active to cancelled or completed state and invokes cancellation handler if necessary
306 */
307 private fun updateStateToFinal(expect: NotCompleted, proposedUpdate: Any?, mode: Int): Boolean {
308 if (!tryUpdateStateToFinal(expect, proposedUpdate)) {
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300309 return false
310 }
311
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300312 completeStateUpdate(expect, proposedUpdate, mode)
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300313 return true
314 }
315
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300316 protected fun tryUpdateStateToFinal(expect: NotCompleted, update: Any?): Boolean {
317 require(update !is NotCompleted) // only NotCompleted -> completed transition is allowed
318 if (!_state.compareAndSet(expect, update)) return false
319 // Unregister from parent job
320 parentHandle?.let {
321 it.dispose() // volatile read parentHandle _after_ state was updated
322 parentHandle = NonDisposableHandle // release it just in case, to aid GC
323 }
324 return true // continues in completeStateUpdate
325 }
326
327 protected fun completeStateUpdate(expect: NotCompleted, update: Any?, mode: Int) {
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300328 val exceptionally = update as? CompletedExceptionally
329 onCompletionInternal(mode)
330
331 // Invoke cancellation handlers only if necessary
Roman Elizarovdbd9e1c2018-04-28 15:14:18 +0300332 if (update is CancelledContinuation && expect is CancelHandler) {
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300333 try {
334 expect.invoke(exceptionally?.cause)
335 } catch (ex: Throwable) {
336 handleException(CompletionHandlerException("Exception in completion handler $expect for $this", ex))
337 }
338 }
339 }
340
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300341 private fun handleException(exception: Throwable) {
342 handleCoroutineException(context, exception)
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300343 }
344
345 // For nicer debugging
346 public override fun toString(): String =
347 "${nameString()}{${stateString()}}@$hexAddress"
348
349 protected open fun nameString(): String = classSimpleName
350
351 private fun stateString(): String {
352 val state = this.state
353 return when (state) {
354 is NotCompleted ->"Active"
355 is CancelledContinuation -> "Cancelled"
356 is CompletedExceptionally -> "CompletedExceptionally"
357 else -> "Completed"
358 }
359 }
360
361}
362
363// Marker for active continuation
364internal interface NotCompleted
365
366private class Active : NotCompleted
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300367private val ACTIVE: Active = Active()
368
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300369// In progress of cancellation
370internal class Cancelling(@JvmField val cancel: CancelledContinuation) : NotCompleted
371
Roman Elizarovdbd9e1c2018-04-28 15:14:18 +0300372internal abstract class CancelHandler : CancelHandlerBase(), NotCompleted
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300373
Roman Elizarovdbd9e1c2018-04-28 15:14:18 +0300374// Wrapper for lambdas, for the performance sake CancelHandler can be subclassed directly
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300375private class InvokeOnCancel( // Clashes with InvokeOnCancellation
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300376 private val handler: CompletionHandler
Roman Elizarovdbd9e1c2018-04-28 15:14:18 +0300377) : CancelHandler() {
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300378 override fun invoke(cause: Throwable?) {
379 handler.invoke(cause)
380 }
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300381 override fun toString() = "InvokeOnCancel[${handler.classSimpleName}@$hexAddress]"
382}