blob: e93eca3a17a22c5259236a200b86d1717da42c47 [file] [log] [blame]
Roman Elizarov2b12d582017-06-22 20:12:19 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kotlinx.coroutines.experimental
18
Roman Elizarovaa461cf2018-04-11 13:20:29 +030019import kotlinx.atomicfu.*
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +030020import kotlinx.coroutines.experimental.internalAnnotations.*
Roman Elizarovaa461cf2018-04-11 13:20:29 +030021import kotlin.coroutines.experimental.*
22import kotlin.coroutines.experimental.intrinsics.*
Roman Elizarov7753f8e2017-08-15 11:16:33 +030023
24private const val UNDECIDED = 0
25private const val SUSPENDED = 1
26private const val RESUMED = 2
27
Roman Elizarov2b12d582017-06-22 20:12:19 +030028/**
29 * @suppress **This is unstable API and it is subject to change.**
30 */
31internal abstract class AbstractContinuation<in T>(
Roman Elizarovf2239e12018-01-10 16:25:25 +030032 public final override val delegate: Continuation<T>,
33 public final override val resumeMode: Int
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +030034) : Continuation<T>, DispatchedTask<T> {
35
36 /*
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +030037 * Implementation notes
38 *
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +030039 * AbstractContinuation is a subset of Job with following limitations:
40 * 1) It can have only cancellation listeners
41 * 2) It always invokes cancellation listener if it's cancelled (no 'invokeImmediately')
42 * 3) It can have at most one cancellation listener
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +030043 * 4) Its cancellation listeners cannot be deregistered
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +030044 * As a consequence it has much simpler state machine, more lightweight machinery and
45 * less dependencies.
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +030046 *
47 * Cancelling state
48 * If useCancellingState is true, then this continuation can have additional cancelling state,
49 * which is transition from Active to Cancelled. This is specific state to support withContext(ctx)
50 * construction: block in withContext can be cancelled from withing or even before stepping into withContext,
51 * but we still want to properly run it (e.g. when it has atomic cancellation mode) and run its completion listener
52 * after.
53 * During cancellation all pending exceptions are aggregated and thrown during transition to final state
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +030054 */
Roman Elizarov2b12d582017-06-22 20:12:19 +030055
56 /* decision state machine
57
58 +-----------+ trySuspend +-----------+
59 | UNDECIDED | -------------> | SUSPENDED |
60 +-----------+ +-----------+
61 |
62 | tryResume
63 V
64 +-----------+
65 | RESUMED |
66 +-----------+
67
68 Note: both tryResume and trySuspend can be invoked at most once, first invocation wins
69 */
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +030070 private val _decision = atomic(UNDECIDED)
71
72 /*
73 === Internal states ===
74 name state class public state description
75 ------ ------------ ------------ -----------
76 ACTIVE Active : Active active, no listeners
Roman Elizarovdbd9e1c2018-04-28 15:14:18 +030077 SINGLE_A CancelHandler : Active active, one cancellation listener
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +030078 CANCELLING Cancelling : Active in the process of cancellation due to cancellation of parent job
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +030079 CANCELLED Cancelled : Cancelled cancelled (final state)
80 COMPLETED any : Completed produced some result or threw an exception (final state)
81 */
82 private val _state = atomic<Any?>(ACTIVE)
83
84 @Volatile
85 private var parentHandle: DisposableHandle? = null
86
87 internal val state: Any? get() = _state.value
88
89 public val isActive: Boolean get() = state is NotCompleted
90
91 public val isCompleted: Boolean get() = state !is NotCompleted
92
93 public val isCancelled: Boolean get() = state is CancelledContinuation
94
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +030095 protected open val useCancellingState: Boolean get() = false
96
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +030097 internal fun initParentJobInternal(parent: Job?) {
98 check(parentHandle == null)
99 if (parent == null) {
100 parentHandle = NonDisposableHandle
101 return
102 }
103 parent.start() // make sure the parent is started
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300104 val handle = parent.invokeOnCompletion(onCancelling = true,
105 handler = ChildContinuation(parent, this).asHandler)
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300106
107 parentHandle = handle
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300108 // now check our state _after_ registering (see updateStateToFinal order of actions)
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300109 if (isCompleted) {
110 handle.dispose()
111 parentHandle = NonDisposableHandle // release it just in case, to aid GC
112 }
113 }
Roman Elizarov2b12d582017-06-22 20:12:19 +0300114
Roman Elizarovf2239e12018-01-10 16:25:25 +0300115 override fun takeState(): Any? = state
116
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300117 public fun cancel(cause: Throwable?): Boolean {
118 loopOnState { state ->
119 if (state !is NotCompleted) return false // quit if already complete
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300120 if (state is Cancelling) return false // someone else succeeded
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300121 if (tryCancel(state, cause)) return true
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300122 }
123 }
124
Roman Elizarovbcdd8e12017-10-20 16:42:06 +0800125 private fun trySuspend(): Boolean {
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300126 _decision.loop { decision ->
Roman Elizarov2b12d582017-06-22 20:12:19 +0300127 when (decision) {
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300128 UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
Roman Elizarov2b12d582017-06-22 20:12:19 +0300129 RESUMED -> return false
130 else -> error("Already suspended")
131 }
132 }
133 }
134
Roman Elizarovbcdd8e12017-10-20 16:42:06 +0800135 private fun tryResume(): Boolean {
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300136 _decision.loop { decision ->
Roman Elizarov2b12d582017-06-22 20:12:19 +0300137 when (decision) {
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300138 UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
Roman Elizarov2b12d582017-06-22 20:12:19 +0300139 SUSPENDED -> return false
140 else -> error("Already resumed")
141 }
142 }
143 }
144
Roman Elizarovbcdd8e12017-10-20 16:42:06 +0800145 @PublishedApi
146 internal fun getResult(): Any? {
147 if (trySuspend()) return COROUTINE_SUSPENDED
Roman Elizarovebc88662018-01-24 23:58:56 +0300148 // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
Roman Elizarovbcdd8e12017-10-20 16:42:06 +0800149 val state = this.state
Vsevolod Tolstopyatovc1092d52018-04-12 20:22:25 +0300150 if (state is CompletedExceptionally) throw state.cause
Roman Elizarovbcdd8e12017-10-20 16:42:06 +0800151 return getSuccessfulResult(state)
152 }
Roman Elizarov2b12d582017-06-22 20:12:19 +0300153
Roman Elizarovbcdd8e12017-10-20 16:42:06 +0800154 override fun resume(value: T) =
155 resumeImpl(value, resumeMode)
156
157 override fun resumeWithException(exception: Throwable) =
158 resumeImpl(CompletedExceptionally(exception), resumeMode)
159
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300160 public fun invokeOnCancellation(handler: CompletionHandler) {
Roman Elizarovdbd9e1c2018-04-28 15:14:18 +0300161 var handleCache: CancelHandler? = null
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300162 loopOnState { state ->
Roman Elizarov2b12d582017-06-22 20:12:19 +0300163 when (state) {
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300164 is Active -> {
165 val node = handleCache ?: makeHandler(handler).also { handleCache = it }
166 if (_state.compareAndSet(state, node)) {
167 return
168 }
Roman Elizarov2b12d582017-06-22 20:12:19 +0300169 }
Roman Elizarovdbd9e1c2018-04-28 15:14:18 +0300170 is CancelHandler -> error("It's prohibited to register multiple handlers, tried to register $handler, already has $state")
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300171 is CancelledContinuation -> {
172 /*
173 * Continuation is complete, invoke directly.
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300174 * NOTE: multiple invokeOnCancellation calls with different handlers are allowed on cancelled continuation.
175 * It's inconsistent with running continuation, but currently, we have no mechanism to check
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300176 * whether any handler was registered during continuation lifecycle without additional overhead.
177 * This may be changed in the future.
178 *
179 * :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
180 * because we play type tricks on Kotlin/JS and handler is not necessarily a function there
181 */
182 handler.invokeIt((state as? CompletedExceptionally)?.cause)
Roman Elizarov2b12d582017-06-22 20:12:19 +0300183 return
184 }
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300185 is Cancelling -> error("Cancellation handlers for continuations with 'Cancelling' state are not supported")
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300186 else -> return
Roman Elizarov2b12d582017-06-22 20:12:19 +0300187 }
188 }
189 }
190
Roman Elizarovdbd9e1c2018-04-28 15:14:18 +0300191 private fun makeHandler(handler: CompletionHandler): CancelHandler =
192 if (handler is CancelHandler) handler else InvokeOnCancel(handler)
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300193
194 private fun tryCancel(state: NotCompleted, cause: Throwable?): Boolean {
195 if (useCancellingState) {
Roman Elizarovdbd9e1c2018-04-28 15:14:18 +0300196 require(state !is CancelHandler) { "Invariant: 'Cancelling' state and cancellation handlers cannot be used together" }
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300197 return _state.compareAndSet(state, Cancelling(CancelledContinuation(this, cause)))
198 }
199
200 return updateStateToFinal(state, CancelledContinuation(this, cause), mode = MODE_ATOMIC_DEFAULT)
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300201 }
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300202
203 private fun onCompletionInternal(mode: Int) {
204 if (tryResume()) return // completed before getResult invocation -- bail out
205 // otherwise, getResult has already commenced, i.e. completed later or in other thread
206 dispatch(mode)
207 }
208
209 protected inline fun loopOnState(block: (Any?) -> Unit): Nothing {
210 while (true) {
211 block(state)
212 }
213 }
214
215 protected fun resumeImpl(proposedUpdate: Any?, resumeMode: Int) {
216 loopOnState { state ->
217 when (state) {
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300218 is Cancelling -> { // withContext() support
219 /*
220 * If already cancelled block is resumed with non-exception,
221 * resume it with cancellation exception.
222 * E.g.
223 * ```
224 * val value = withContext(ctx) {
225 * outerJob.cancel() // -> cancelling
226 * 42 // -> cancelled
227 * }
228 * ```
229 * should throw cancellation exception instead of returning 42
230 */
231 if (proposedUpdate !is CompletedExceptionally) {
232 val update = state.cancel
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300233 if (updateStateToFinal(state, update, resumeMode)) return
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300234 } else {
235 /*
236 * If already cancelled block is resumed with an exception,
Vsevolod Tolstopyatovf2b4e0e2018-05-25 18:58:01 +0300237 * then we should properly merge them to avoid information loss.
238 *
239 * General rule:
240 * Thrown exception always becomes a result and cancellation reason
241 * is added to suppressed exceptions if necessary.
242 * Basic duplicate/cycles check is performed
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300243 */
244 val update: CompletedExceptionally
245
246 /*
247 * Proposed update is another CancellationException.
248 * e.g.
249 * ```
250 * T1: ctxJob.cancel(e1) // -> cancelling
251 * T2:
Vsevolod Tolstopyatovf2b4e0e2018-05-25 18:58:01 +0300252 * withContext(ctx, Mode.ATOMIC) {
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300253 * // -> resumed with cancellation exception
254 * }
255 * ```
256 */
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300257 if (proposedUpdate.cause is CancellationException) {
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300258 // Keep original cancellation cause and try add to suppressed exception from proposed cancel
Vsevolod Tolstopyatovf2b4e0e2018-05-25 18:58:01 +0300259 update = proposedUpdate
260 coerceWithException(state, update)
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300261 } else {
262 /*
263 * Proposed update is exception => transition to terminal state
264 * E.g.
265 * ```
266 * withContext(ctx) {
267 * outerJob.cancel() // -> cancelling
268 * throw Exception() // -> completed exceptionally
269 * }
270 * ```
271 */
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300272 val exception = proposedUpdate.cause
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300273 val currentException = state.cancel.cause
274 // Add to suppressed if original cancellation differs from proposed exception
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300275 if (currentException !is CancellationException || currentException.cause !== exception) {
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300276 exception.addSuppressedThrowable(currentException)
277 }
278
279 update = CompletedExceptionally(exception)
280 }
281
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300282 if (updateStateToFinal(state, update, resumeMode)) {
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300283 return
284 }
285 }
286 }
287
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300288 is NotCompleted -> {
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300289 if (updateStateToFinal(state, proposedUpdate, resumeMode)) return
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300290 }
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300291 is CancelledContinuation -> {
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300292 if (proposedUpdate is NotCompleted || proposedUpdate is CompletedExceptionally) {
293 error("Unexpected update, state: $state, update: $proposedUpdate")
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300294 }
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300295 // Coroutine is dispatched normally (e.g.via `delay()`) after cancellation
296 return
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300297 }
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300298 else -> error("Already resumed, but proposed with update $proposedUpdate")
299 }
300 }
301 }
302
Vsevolod Tolstopyatovf2b4e0e2018-05-25 18:58:01 +0300303 // Coerce current cancelling state with proposed exception
304 private fun coerceWithException(state: Cancelling, proposedUpdate: CompletedExceptionally) {
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300305 val originalCancellation = state.cancel
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300306 val originalException = originalCancellation.cause
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300307 val updateCause = proposedUpdate.cause
Vsevolod Tolstopyatovf2b4e0e2018-05-25 18:58:01 +0300308 // Cause of proposed update is present and differs from one in current state
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300309 val isSameCancellation = originalCancellation.cause is CancellationException
310 && originalException.cause === updateCause.cause
Vsevolod Tolstopyatovf2b4e0e2018-05-25 18:58:01 +0300311 if (!isSameCancellation && (originalException.cause !== updateCause)) {
312 proposedUpdate.cause.addSuppressedThrowable(originalException)
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300313 }
314 }
315
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300316 /**
317 * Tries to make transition from active to cancelled or completed state and invokes cancellation handler if necessary
318 */
319 private fun updateStateToFinal(expect: NotCompleted, proposedUpdate: Any?, mode: Int): Boolean {
320 if (!tryUpdateStateToFinal(expect, proposedUpdate)) {
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300321 return false
322 }
323
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300324 completeStateUpdate(expect, proposedUpdate, mode)
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300325 return true
326 }
327
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300328 protected fun tryUpdateStateToFinal(expect: NotCompleted, update: Any?): Boolean {
329 require(update !is NotCompleted) // only NotCompleted -> completed transition is allowed
330 if (!_state.compareAndSet(expect, update)) return false
331 // Unregister from parent job
332 parentHandle?.let {
333 it.dispose() // volatile read parentHandle _after_ state was updated
334 parentHandle = NonDisposableHandle // release it just in case, to aid GC
335 }
336 return true // continues in completeStateUpdate
337 }
338
339 protected fun completeStateUpdate(expect: NotCompleted, update: Any?, mode: Int) {
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300340 val exceptionally = update as? CompletedExceptionally
341 onCompletionInternal(mode)
342
343 // Invoke cancellation handlers only if necessary
Roman Elizarovdbd9e1c2018-04-28 15:14:18 +0300344 if (update is CancelledContinuation && expect is CancelHandler) {
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300345 try {
346 expect.invoke(exceptionally?.cause)
347 } catch (ex: Throwable) {
348 handleException(CompletionHandlerException("Exception in completion handler $expect for $this", ex))
349 }
350 }
351 }
352
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300353 private fun handleException(exception: Throwable) {
354 handleCoroutineException(context, exception)
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300355 }
356
357 // For nicer debugging
358 public override fun toString(): String =
359 "${nameString()}{${stateString()}}@$hexAddress"
360
361 protected open fun nameString(): String = classSimpleName
362
363 private fun stateString(): String {
364 val state = this.state
365 return when (state) {
366 is NotCompleted ->"Active"
367 is CancelledContinuation -> "Cancelled"
368 is CompletedExceptionally -> "CompletedExceptionally"
369 else -> "Completed"
370 }
371 }
372
373}
374
375// Marker for active continuation
376internal interface NotCompleted
377
378private class Active : NotCompleted
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300379private val ACTIVE: Active = Active()
380
Vsevolod Tolstopyatov4aa18aa2018-04-17 15:43:12 +0300381// In progress of cancellation
382internal class Cancelling(@JvmField val cancel: CancelledContinuation) : NotCompleted
383
Roman Elizarovdbd9e1c2018-04-28 15:14:18 +0300384internal abstract class CancelHandler : CancelHandlerBase(), NotCompleted
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300385
Roman Elizarovdbd9e1c2018-04-28 15:14:18 +0300386// Wrapper for lambdas, for the performance sake CancelHandler can be subclassed directly
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300387private class InvokeOnCancel( // Clashes with InvokeOnCancellation
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300388 private val handler: CompletionHandler
Roman Elizarovdbd9e1c2018-04-28 15:14:18 +0300389) : CancelHandler() {
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300390 override fun invoke(cause: Throwable?) {
391 handler.invoke(cause)
392 }
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300393 override fun toString() = "InvokeOnCancel[${handler.classSimpleName}@$hexAddress]"
394}