Roman Elizarov | 1f74a2d | 2018-06-29 19:19:45 +0300 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
| 3 | */ |
| 4 | |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 5 | package kotlinx.coroutines.experimental |
| 6 | |
| 7 | import kotlinx.atomicfu.* |
| 8 | import kotlinx.coroutines.experimental.internal.* |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 9 | import kotlinx.coroutines.experimental.intrinsics.* |
| 10 | import kotlinx.coroutines.experimental.selects.* |
| 11 | import kotlin.coroutines.experimental.* |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 12 | |
| 13 | /** |
| 14 | * A concrete implementation of [Job]. It is optionally a child to a parent job. |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 15 | * |
| 16 | * This is an open class designed for extension by more specific classes that might augment the |
| 17 | * state and mare store addition state information for completed jobs, like their result values. |
| 18 | * |
| 19 | * @param active when `true` the job is created in _active_ state, when `false` in _new_ state. See [Job] for details. |
| 20 | * @suppress **This is unstable API and it is subject to change.** |
| 21 | */ |
Roman Elizarov | f7a6334 | 2018-09-25 00:26:25 +0300 | [diff] [blame] | 22 | internal open class JobSupport constructor(active: Boolean) : Job, ChildJob, SelectClause0 { |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 23 | final override val key: CoroutineContext.Key<*> get() = Job |
| 24 | |
| 25 | /* |
| 26 | === Internal states === |
| 27 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 28 | name state class public state description |
| 29 | ------ ------------ ------------ ----------- |
| 30 | EMPTY_N EmptyNew : New no listeners |
| 31 | EMPTY_A EmptyActive : Active no listeners |
| 32 | SINGLE JobNode : Active a single listener |
| 33 | SINGLE+ JobNode : Active a single listener + NodeList added as its next |
| 34 | LIST_N InactiveNodeList : New a list of listeners (promoted once, does not got back to EmptyNew) |
| 35 | LIST_A NodeList : Active a list of listeners (promoted once, does not got back to JobNode/EmptyActive) |
| 36 | COMPLETING Finishing : Completing has a list of listeners (promoted once from LIST_*) |
| 37 | CANCELLING Finishing : Cancelling -- " -- |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 38 | FINAL_C Cancelled : Cancelled Cancelled (final state) |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 39 | FINAL_R <any> : Completed produced some result |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 40 | |
| 41 | === Transitions === |
| 42 | |
| 43 | New states Active states Inactive states |
| 44 | |
| 45 | +---------+ +---------+ } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 46 | | EMPTY_N | ----> | EMPTY_A | ----+ } Empty states |
| 47 | +---------+ +---------+ | } |
| 48 | | | | ^ | +----------+ |
| 49 | | | | | +--> | FINAL_* | |
| 50 | | | V | | +----------+ |
| 51 | | | +---------+ | } |
| 52 | | | | SINGLE | ----+ } JobNode states |
| 53 | | | +---------+ | } |
| 54 | | | | | } |
| 55 | | | V | } |
| 56 | | | +---------+ | } |
| 57 | | +-------> | SINGLE+ | ----+ } |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 58 | | +---------+ | } |
| 59 | | | | |
| 60 | V V | |
| 61 | +---------+ +---------+ | } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 62 | | LIST_N | ----> | LIST_A | ----+ } [Inactive]NodeList states |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 63 | +---------+ +---------+ | } |
| 64 | | | | | | |
| 65 | | | +--------+ | | |
| 66 | | | | V | |
| 67 | | | | +------------+ | +------------+ } |
| 68 | | +-------> | COMPLETING | --+-- | CANCELLING | } Finishing states |
| 69 | | | +------------+ +------------+ } |
| 70 | | | | ^ |
| 71 | | | | | |
| 72 | +--------+---------+--------------------+ |
| 73 | |
| 74 | |
| 75 | This state machine and its transition matrix are optimized for the common case when job is created in active |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 76 | state (EMPTY_A), at most one completion listener is added to it during its life-time, and it completes |
| 77 | successfully without children (in this case it directly goes from EMPTY_A or SINGLE state to FINAL_R |
| 78 | state without going to COMPLETING state) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 79 | |
| 80 | Note, that the actual `_state` variable can also be a reference to atomic operation descriptor `OpDescriptor` |
Roman Elizarov | ff0aab8 | 2018-09-22 17:11:53 +0300 | [diff] [blame] | 81 | |
| 82 | ---------- TIMELINE of state changes and notification in Job lifecycle ---------- |
| 83 | |
| 84 | | The longest possible chain of events in shown, shorter versions cut-through intermediate states, |
| 85 | | while still performing all the notifications in this order. |
| 86 | |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 87 | + Job object is created |
Roman Elizarov | ff0aab8 | 2018-09-22 17:11:53 +0300 | [diff] [blame] | 88 | ## NEW: state == EMPTY_ACTIVE | is InactiveNodeList |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 89 | + initParentJob / initParentJobInternal (invokes attachChild on its parent, initializes parentHandle) |
| 90 | ~ waits for start |
| 91 | >> start / join / await invoked |
Roman Elizarov | ff0aab8 | 2018-09-22 17:11:53 +0300 | [diff] [blame] | 92 | ## ACTIVE: state == EMPTY_ACTIVE | is JobNode | is NodeList |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 93 | + onStartInternal / onStart (lazy coroutine is started) |
| 94 | ~ active coroutine is working (or scheduled to execution) |
| 95 | >> childCancelled / cancelImpl invoked |
| 96 | ## CANCELLING: state is Finishing, state.rootCause != null |
| 97 | ------ cancelling listeners are not admitted anymore, invokeOnCompletion(onCancellation=true) returns NonDisposableHandle |
| 98 | ------ new children get immediately cancelled, but are still admitted to the list |
| 99 | + onCancellation |
| 100 | + notifyCancelling (invoke all cancelling listeners -- cancel all children, suspended functions resume with exception) |
| 101 | + cancelParent (rootCause of cancellation is communicated to the parent, parent is cancelled, too) |
Vsevolod Tolstopyatov | f157cec | 2018-09-24 17:22:06 +0300 | [diff] [blame] | 102 | ~ waits for completion of coroutine body |
| 103 | >> makeCompleting / makeCompletingOnce invoked |
Roman Elizarov | ff0aab8 | 2018-09-22 17:11:53 +0300 | [diff] [blame] | 104 | ## COMPLETING: state is Finishing, state.isCompleting == true |
Vsevolod Tolstopyatov | f157cec | 2018-09-24 17:22:06 +0300 | [diff] [blame] | 105 | ------ new children are not admitted anymore, attachChild returns NonDisposableHandle |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 106 | ~ waits for children |
| 107 | >> last child completes |
| 108 | - computes the final exception |
Roman Elizarov | ff0aab8 | 2018-09-22 17:11:53 +0300 | [diff] [blame] | 109 | ## SEALED: state is Finishing, state.isSealed == true |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 110 | ------ cancel/childCancelled returns false (cannot handle exceptions anymore) |
| 111 | + cancelParent (final exception is communicated to the parent, parent incorporates it) |
Vsevolod Tolstopyatov | f157cec | 2018-09-24 17:22:06 +0300 | [diff] [blame] | 112 | + handleJobException ("launch" StandaloneCoroutine invokes CoroutineExceptionHandler) |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 113 | ## COMPLETE: state !is Incomplete (CompletedExceptionally | Cancelled) |
| 114 | ------ completion listeners are not admitted anymore, invokeOnCompletion returns NonDisposableHandle |
Vsevolod Tolstopyatov | f157cec | 2018-09-24 17:22:06 +0300 | [diff] [blame] | 115 | + parentHandle.dispose |
| 116 | + notifyCompletion (invoke all completion listeners) |
| 117 | + onCompletionInternal / onCompleted / onCompletedExceptionally |
Roman Elizarov | ff0aab8 | 2018-09-22 17:11:53 +0300 | [diff] [blame] | 118 | |
| 119 | --------------------------------------------------------------------------------- |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 120 | */ |
| 121 | |
| 122 | // Note: use shared objects while we have no listeners |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 123 | private val _state = atomic<Any?>(if (active) EMPTY_ACTIVE else EMPTY_NEW) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 124 | |
| 125 | @Volatile |
Roman Elizarov | 5d18d02 | 2018-09-22 22:13:05 +0300 | [diff] [blame] | 126 | private var parentHandle: ChildHandle? = null |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 127 | |
| 128 | // ------------ initialization ------------ |
| 129 | |
| 130 | /** |
| 131 | * Initializes parent job. |
| 132 | * It shall be invoked at most once after construction after all other initialization. |
| 133 | * @suppress **This is unstable API and it is subject to change.** |
| 134 | */ |
| 135 | internal fun initParentJobInternal(parent: Job?) { |
| 136 | check(parentHandle == null) |
| 137 | if (parent == null) { |
| 138 | parentHandle = NonDisposableHandle |
| 139 | return |
| 140 | } |
| 141 | parent.start() // make sure the parent is started |
| 142 | @Suppress("DEPRECATION") |
| 143 | val handle = parent.attachChild(this) |
| 144 | parentHandle = handle |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 145 | // now check our state _after_ registering (see tryFinalizeSimpleState order of actions) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 146 | if (isCompleted) { |
| 147 | handle.dispose() |
| 148 | parentHandle = NonDisposableHandle // release it just in case, to aid GC |
| 149 | } |
| 150 | } |
| 151 | |
| 152 | // ------------ state query ------------ |
| 153 | |
| 154 | /** |
| 155 | * Returns current state of this job. |
| 156 | * @suppress **This is unstable API and it is subject to change.** |
| 157 | */ |
| 158 | internal val state: Any? get() { |
| 159 | _state.loop { state -> // helper loop on state (complete in-progress atomic operations) |
| 160 | if (state !is OpDescriptor) return state |
| 161 | state.perform(this) |
| 162 | } |
| 163 | } |
| 164 | |
| 165 | /** |
| 166 | * @suppress **This is unstable API and it is subject to change.** |
| 167 | */ |
Vsevolod Tolstopyatov | f6430f4 | 2018-04-17 17:56:32 +0300 | [diff] [blame] | 168 | private inline fun loopOnState(block: (Any?) -> Unit): Nothing { |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 169 | while (true) { |
| 170 | block(state) |
| 171 | } |
| 172 | } |
| 173 | |
Vsevolod Tolstopyatov | 79414ec | 2018-08-30 16:50:56 +0300 | [diff] [blame] | 174 | public override val isActive: Boolean get() { |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 175 | val state = this.state |
| 176 | return state is Incomplete && state.isActive |
| 177 | } |
| 178 | |
| 179 | public final override val isCompleted: Boolean get() = state !is Incomplete |
| 180 | |
| 181 | public final override val isCancelled: Boolean get() { |
| 182 | val state = this.state |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 183 | return state is CompletedExceptionally || (state is Finishing && state.isCancelling) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 184 | } |
| 185 | |
| 186 | // ------------ state update ------------ |
| 187 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 188 | // Finalizes Finishing -> Completed (terminal state) transition. |
| 189 | // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method. |
| 190 | private fun tryFinalizeFinishingState(state: Finishing, proposedUpdate: Any?, mode: Int): Boolean { |
| 191 | require(proposedUpdate !is Incomplete) // only incomplete -> completed transition is allowed |
| 192 | require(this.state === state) // consistency check -- it cannot change |
| 193 | require(!state.isSealed) // consistency check -- cannot be sealed yet |
| 194 | require(state.isCompleting) // consistency check -- must be marked as completing |
| 195 | val proposedException = (proposedUpdate as? CompletedExceptionally)?.cause |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 196 | // Create the final exception and seal the state so that no more exceptions can be added |
| 197 | var suppressed = false |
| 198 | val finalException = synchronized(state) { |
| 199 | val exceptions = state.sealLocked(proposedException) |
Vsevolod Tolstopyatov | 1b22af7 | 2018-09-25 19:16:02 +0300 | [diff] [blame] | 200 | val finalCause = getFinalRootCause(state, exceptions) |
| 201 | // Report suppressed exceptions if initial cause doesn't match final cause (due to JCE unwrapping) |
| 202 | if (finalCause != null) suppressed = suppressExceptions(finalCause, exceptions) || finalCause !== state.rootCause |
| 203 | finalCause |
Vsevolod Tolstopyatov | c9afb67 | 2018-07-24 20:30:48 +0300 | [diff] [blame] | 204 | } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 205 | // Create the final state object |
| 206 | val finalState = when { |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 207 | // was not cancelled (no exception) -> use proposed update value |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 208 | finalException == null -> proposedUpdate |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 209 | // small optimization when we can used proposeUpdate object as is on cancellation |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 210 | finalException === proposedException -> proposedUpdate |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 211 | // cancelled job final state |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 212 | else -> CompletedExceptionally(finalException) |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 213 | } |
Vsevolod Tolstopyatov | f157cec | 2018-09-24 17:22:06 +0300 | [diff] [blame] | 214 | // Now handle exception if parent can't handle it |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 215 | if (finalException != null && !cancelParent(finalException)) { |
Vsevolod Tolstopyatov | f157cec | 2018-09-24 17:22:06 +0300 | [diff] [blame] | 216 | handleJobException(finalException) |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 217 | } |
| 218 | // Then CAS to completed state -> it must succeed |
| 219 | require(_state.compareAndSet(state, finalState)) { "Unexpected state: ${_state.value}, expected: $state, update: $finalState" } |
| 220 | // And process all post-completion actions |
| 221 | completeStateFinalization(state, finalState, mode, suppressed) |
| 222 | return true |
Vsevolod Tolstopyatov | 3bda22c | 2018-07-20 16:14:49 +0300 | [diff] [blame] | 223 | } |
| 224 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 225 | private fun getFinalRootCause(state: Finishing, exceptions: List<Throwable>): Throwable? { |
| 226 | // A case of no exceptions |
| 227 | if (exceptions.isEmpty()) { |
| 228 | // materialize cancellation exception if it was not materialized yet |
| 229 | if (state.isCancelling) return createJobCancellationException() |
| 230 | return null |
| 231 | } |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 232 | /* |
| 233 | * This is a place where we step on our API limitation: |
| 234 | * We can't distinguish internal JobCancellationException from our parent |
| 235 | * from external cancellation, thus we ought to collect all exceptions. |
| 236 | * |
| 237 | * But it has negative consequences: same exception can be added as suppressed more than once. |
| 238 | * Consider concurrent parent-child relationship: |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 239 | * 1) Child throws E1 and parent throws E2. |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 240 | * 2) Parent goes to "Cancelling(E1)" and cancels child with E1 |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 241 | * 3) Child goes to "Cancelling(E1)", but throws an exception E2 |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 242 | * 4) When child throws, it notifies parent that it is cancelled, adding its exception to parent's list of exceptions |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 243 | * 5) Child builds final exception: E1 with suppressed E2, reports it to parent. |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 244 | * 6) Parent aggregates three exceptions: original E1, reported E2 and "final" E1. |
| 245 | * It filters the third exception, but adds the second one to the first one, thus adding suppressed duplicate. |
| 246 | * |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 247 | * Note that it's only happening when both parent and child throw exception simultaneously. |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 248 | */ |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 249 | var rootCause = exceptions[0] |
Roman Elizarov | 541a9b6 | 2018-09-25 23:23:34 +0300 | [diff] [blame^] | 250 | if (rootCause is CancellationException) { |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 251 | val cause = unwrap(rootCause) |
| 252 | rootCause = if (cause !== null) { |
| 253 | cause |
| 254 | } else { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 255 | exceptions.firstOrNull { unwrap(it) != null } ?: return rootCause |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 256 | } |
| 257 | } |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 258 | return rootCause |
| 259 | } |
| 260 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 261 | private fun suppressExceptions(rootCause: Throwable, exceptions: List<Throwable>): Boolean { |
| 262 | if (exceptions.size <= 1) return false // nothing more to do here |
Vsevolod Tolstopyatov | f157cec | 2018-09-24 17:22:06 +0300 | [diff] [blame] | 263 | val seenExceptions = identitySet<Throwable>(exceptions.size) |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 264 | var suppressed = false |
| 265 | for (i in 1 until exceptions.size) { |
| 266 | val unwrapped = unwrap(exceptions[i]) |
| 267 | if (unwrapped !== null && unwrapped !== rootCause) { |
| 268 | if (seenExceptions.add(unwrapped)) { |
| 269 | rootCause.addSuppressedThrowable(unwrapped) |
| 270 | suppressed = true |
| 271 | } |
| 272 | } |
| 273 | } |
| 274 | return suppressed |
| 275 | } |
| 276 | |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 277 | private tailrec fun unwrap(exception: Throwable): Throwable? = |
Roman Elizarov | 541a9b6 | 2018-09-25 23:23:34 +0300 | [diff] [blame^] | 278 | if (exception is CancellationException) { |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 279 | val cause = exception.cause |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 280 | if (cause !== null) unwrap(cause) else null |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 281 | } else { |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 282 | exception |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 283 | } |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 284 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 285 | // fast-path method to finalize normally completed coroutines without children |
| 286 | private fun tryFinalizeSimpleState(state: Incomplete, update: Any?, mode: Int): Boolean { |
Roman Elizarov | 7e23875 | 2018-09-20 15:05:41 +0300 | [diff] [blame] | 287 | check(state is Empty || state is JobNode<*>) // only simple state without lists where children can concurrently add |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 288 | check(update !is CompletedExceptionally) // only for normal completion |
| 289 | if (!_state.compareAndSet(state, update)) return false |
| 290 | completeStateFinalization(state, update, mode, false) |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 291 | return true |
| 292 | } |
| 293 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 294 | // suppressed == true when any exceptions were suppressed while building the final completion cause |
| 295 | private fun completeStateFinalization(state: Incomplete, update: Any?, mode: Int, suppressed: Boolean) { |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 296 | /* |
| 297 | * Now the job in THE FINAL state. We need to properly handle the resulting state. |
| 298 | * Order of various invocations here is important. |
| 299 | * |
Vsevolod Tolstopyatov | c758169 | 2018-08-13 17:18:01 +0300 | [diff] [blame] | 300 | * 1) Unregister from parent job. |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 301 | */ |
Vsevolod Tolstopyatov | 3bda22c | 2018-07-20 16:14:49 +0300 | [diff] [blame] | 302 | parentHandle?.let { |
| 303 | it.dispose() // volatile read parentHandle _after_ state was updated |
| 304 | parentHandle = NonDisposableHandle // release it just in case, to aid GC |
| 305 | } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 306 | val cause = (update as? CompletedExceptionally)?.cause |
Vsevolod Tolstopyatov | 3bda22c | 2018-07-20 16:14:49 +0300 | [diff] [blame] | 307 | /* |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 308 | * 2) Invoke onCancellation: for resource cancellation resource cancellation etc. |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 309 | * Only notify is was not notified yet. |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 310 | * Note: we do not use notifyCancelling here, since we are going to invoke all completion as our next step |
Vsevolod Tolstopyatov | 3bda22c | 2018-07-20 16:14:49 +0300 | [diff] [blame] | 311 | */ |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 312 | if (!state.isCancelling) onCancellation(cause) |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 313 | /* |
Vsevolod Tolstopyatov | c758169 | 2018-08-13 17:18:01 +0300 | [diff] [blame] | 314 | * 3) Invoke completion handlers: .join(), callbacks etc. |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 315 | * It's important to invoke them only AFTER exception handling, see #208 |
| 316 | */ |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 317 | if (state is JobNode<*>) { // SINGLE/SINGLE+ state -- one completion handler (common case) |
Vsevolod Tolstopyatov | 3bda22c | 2018-07-20 16:14:49 +0300 | [diff] [blame] | 318 | try { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 319 | state.invoke(cause) |
Vsevolod Tolstopyatov | 3bda22c | 2018-07-20 16:14:49 +0300 | [diff] [blame] | 320 | } catch (ex: Throwable) { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 321 | handleOnCompletionException(CompletionHandlerException("Exception in completion handler $state for $this", ex)) |
Vsevolod Tolstopyatov | 3bda22c | 2018-07-20 16:14:49 +0300 | [diff] [blame] | 322 | } |
| 323 | } else { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 324 | state.list?.notifyCompletion(cause) |
Vsevolod Tolstopyatov | 3bda22c | 2018-07-20 16:14:49 +0300 | [diff] [blame] | 325 | } |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 326 | /* |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 327 | * 4) Invoke onCompletionInternal: onNext(), timeout de-registration etc. |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 328 | * It should be last so all callbacks observe consistent state |
| 329 | * of the job which doesn't depend on callback scheduling. |
| 330 | */ |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 331 | onCompletionInternal(update, mode, suppressed) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 332 | } |
| 333 | |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 334 | private fun notifyCancelling(list: NodeList, cause: Throwable) { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 335 | // first cancel our own children |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 336 | onCancellation(cause) |
| 337 | notifyHandlers<JobCancellingNode<*>>(list, cause) |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 338 | // then cancel parent |
| 339 | cancelParent(cause) // tentative cancellation -- does not matter if there is no parent |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 340 | } |
| 341 | |
Vsevolod Tolstopyatov | 3bda22c | 2018-07-20 16:14:49 +0300 | [diff] [blame] | 342 | private fun NodeList.notifyCompletion(cause: Throwable?) = |
| 343 | notifyHandlers<JobNode<*>>(this, cause) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 344 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 345 | private inline fun <reified T: JobNode<*>> notifyHandlers(list: NodeList, cause: Throwable?) { |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 346 | var exception: Throwable? = null |
| 347 | list.forEach<T> { node -> |
| 348 | try { |
| 349 | node.invoke(cause) |
| 350 | } catch (ex: Throwable) { |
| 351 | exception?.apply { addSuppressedThrowable(ex) } ?: run { |
| 352 | exception = CompletionHandlerException("Exception in completion handler $node for $this", ex) |
| 353 | } |
| 354 | } |
| 355 | } |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 356 | exception?.let { handleOnCompletionException(it) } |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 357 | } |
| 358 | |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 359 | public final override fun start(): Boolean { |
| 360 | loopOnState { state -> |
| 361 | when (startInternal(state)) { |
| 362 | FALSE -> return false |
| 363 | TRUE -> return true |
| 364 | } |
| 365 | } |
| 366 | } |
| 367 | |
| 368 | // returns: RETRY/FALSE/TRUE: |
| 369 | // FALSE when not new, |
| 370 | // TRUE when started |
| 371 | // RETRY when need to retry |
| 372 | private fun startInternal(state: Any?): Int { |
| 373 | when (state) { |
| 374 | is Empty -> { // EMPTY_X state -- no completion handlers |
| 375 | if (state.isActive) return FALSE // already active |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 376 | if (!_state.compareAndSet(state, EMPTY_ACTIVE)) return RETRY |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 377 | onStartInternal() |
| 378 | return TRUE |
| 379 | } |
Roman Elizarov | ede2923 | 2018-09-18 12:53:09 +0300 | [diff] [blame] | 380 | is InactiveNodeList -> { // LIST state -- inactive with a list of completion handlers |
| 381 | if (!_state.compareAndSet(state, state.list)) return RETRY |
| 382 | onStartInternal() |
| 383 | return TRUE |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 384 | } |
| 385 | else -> return FALSE // not a new state |
| 386 | } |
| 387 | } |
| 388 | |
| 389 | /** |
| 390 | * Override to provide the actual [start] action. |
| 391 | * This function is invoked exactly once when non-active coroutine is [started][start]. |
| 392 | */ |
| 393 | internal open fun onStartInternal() {} |
| 394 | |
| 395 | public final override fun getCancellationException(): CancellationException { |
| 396 | val state = this.state |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 397 | return when (state) { |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 398 | is Finishing -> state.rootCause?.toCancellationException("Job is cancelling") |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 399 | ?: error("Job is still new or active: $this") |
| 400 | is Incomplete -> error("Job is still new or active: $this") |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 401 | is CompletedExceptionally -> state.cause.toCancellationException("Job was cancelled") |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 402 | else -> JobCancellationException("Job has completed normally", null, this) |
| 403 | } |
| 404 | } |
| 405 | |
| 406 | private fun Throwable.toCancellationException(message: String): CancellationException = |
| 407 | this as? CancellationException ?: JobCancellationException(message, this, this@JobSupport) |
| 408 | |
| 409 | /** |
| 410 | * Returns the cause that signals the completion of this job -- it returns the original |
Vsevolod Tolstopyatov | 149ba48 | 2018-09-24 20:28:02 +0300 | [diff] [blame] | 411 | * [cancel] cause, [CancellationException] or **`null` if this job had completed normally**. |
Roman Elizarov | 6d9f40f | 2018-04-28 14:44:02 +0300 | [diff] [blame] | 412 | * This function throws [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 413 | * is being cancelled yet. |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 414 | * |
| 415 | * @suppress **This is unstable API and it is subject to change.** |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 416 | */ |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 417 | protected fun getCompletionCause(): Throwable? = loopOnState { state -> |
| 418 | return when (state) { |
| 419 | is Finishing -> state.rootCause |
| 420 | ?: error("Job is still new or active: $this") |
| 421 | is Incomplete -> error("Job is still new or active: $this") |
| 422 | is CompletedExceptionally -> state.cause |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 423 | else -> null |
| 424 | } |
| 425 | } |
| 426 | |
| 427 | @Suppress("OverridingDeprecatedMember") |
| 428 | public final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle = |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 429 | invokeOnCompletion(onCancelling = false, invokeImmediately = true, handler = handler) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 430 | |
| 431 | @Suppress("OverridingDeprecatedMember") |
| 432 | public final override fun invokeOnCompletion(handler: CompletionHandler, onCancelling: Boolean): DisposableHandle = |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 433 | invokeOnCompletion(onCancelling = onCancelling, invokeImmediately = true, handler = handler) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 434 | |
| 435 | @Suppress("OverridingDeprecatedMember") |
| 436 | public final override fun invokeOnCompletion(onCancelling_: Boolean, handler: CompletionHandler): DisposableHandle = |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 437 | invokeOnCompletion(onCancelling = onCancelling_, invokeImmediately = true, handler = handler) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 438 | |
| 439 | // todo: non-final as a workaround for KT-21968, should be final in the future |
| 440 | public override fun invokeOnCompletion( |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 441 | onCancelling: Boolean, |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 442 | invokeImmediately: Boolean, |
| 443 | handler: CompletionHandler |
| 444 | ): DisposableHandle { |
| 445 | var nodeCache: JobNode<*>? = null |
| 446 | loopOnState { state -> |
| 447 | when (state) { |
| 448 | is Empty -> { // EMPTY_X state -- no completion handlers |
| 449 | if (state.isActive) { |
| 450 | // try move to SINGLE state |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 451 | val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it } |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 452 | if (_state.compareAndSet(state, node)) return node |
| 453 | } else |
| 454 | promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine |
| 455 | } |
| 456 | is Incomplete -> { |
| 457 | val list = state.list |
| 458 | if (list == null) { // SINGLE/SINGLE+ |
| 459 | promoteSingleToNodeList(state as JobNode<*>) |
| 460 | } else { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 461 | var rootCause: Throwable? = null |
| 462 | var handle: DisposableHandle = NonDisposableHandle |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 463 | if (onCancelling && state is Finishing) { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 464 | synchronized(state) { |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 465 | // check if we are installing cancellation handler on job that is being cancelled |
| 466 | rootCause = state.rootCause // != null if cancelling job |
| 467 | // We add node to the list in two cases --- either the job is not being cancelled |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 468 | // or we are adding a child to a coroutine that is not completing yet |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 469 | if (rootCause == null || handler.isHandlerOf<ChildHandleNode>() && !state.isCompleting) { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 470 | // Note: add node the list while holding lock on state (make sure it cannot change) |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 471 | val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 472 | if (!addLastAtomic(state, list, node)) return@loopOnState // retry |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 473 | // just return node if we don't have to invoke handler (not cancelling yet) |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 474 | if (rootCause == null) return node |
| 475 | // otherwise handler is invoked immediately out of the synchronized section & handle returned |
| 476 | handle = node |
| 477 | } |
| 478 | } |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 479 | } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 480 | if (rootCause != null) { |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 481 | // Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 482 | if (invokeImmediately) handler.invokeIt(rootCause) |
| 483 | return handle |
| 484 | } else { |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 485 | val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 486 | if (addLastAtomic(state, list, node)) return node |
| 487 | } |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 488 | } |
| 489 | } |
| 490 | else -> { // is complete |
| 491 | // :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension, |
| 492 | // because we play type tricks on Kotlin/JS and handler is not necessarily a function there |
| 493 | if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause) |
| 494 | return NonDisposableHandle |
| 495 | } |
| 496 | } |
| 497 | } |
| 498 | } |
| 499 | |
| 500 | private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode<*> { |
Vsevolod Tolstopyatov | f6430f4 | 2018-04-17 17:56:32 +0300 | [diff] [blame] | 501 | return if (onCancelling) |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 502 | (handler as? JobCancellingNode<*>)?.also { require(it.job === this) } |
| 503 | ?: InvokeOnCancelling(this, handler) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 504 | else |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 505 | (handler as? JobNode<*>)?.also { require(it.job === this && it !is JobCancellingNode) } |
Roman Elizarov | 6d9f40f | 2018-04-28 14:44:02 +0300 | [diff] [blame] | 506 | ?: InvokeOnCompletion(this, handler) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 507 | } |
| 508 | |
| 509 | private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode<*>) = |
| 510 | list.addLastIf(node) { this.state === expect } |
| 511 | |
| 512 | private fun promoteEmptyToNodeList(state: Empty) { |
Roman Elizarov | ede2923 | 2018-09-18 12:53:09 +0300 | [diff] [blame] | 513 | // try to promote it to LIST state with the corresponding state |
| 514 | val list = NodeList() |
| 515 | val update = if (state.isActive) list else InactiveNodeList(list) |
| 516 | _state.compareAndSet(state, update) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 517 | } |
| 518 | |
| 519 | private fun promoteSingleToNodeList(state: JobNode<*>) { |
| 520 | // try to promote it to list (SINGLE+ state) |
Roman Elizarov | ede2923 | 2018-09-18 12:53:09 +0300 | [diff] [blame] | 521 | state.addOneIfEmpty(NodeList()) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 522 | // it must be in SINGLE+ state or state has changed (node could have need removed from state) |
| 523 | val list = state.nextNode // either our NodeList or somebody else won the race, updated state |
| 524 | // just attempt converting it to list if state is still the same, then we'll continue lock-free loop |
| 525 | _state.compareAndSet(state, list) |
| 526 | } |
| 527 | |
| 528 | public final override suspend fun join() { |
| 529 | if (!joinInternal()) { // fast-path no wait |
Roman Elizarov | 222f3f2 | 2018-07-13 18:47:17 +0300 | [diff] [blame] | 530 | coroutineContext.checkCompletion() |
| 531 | return // do not suspend |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 532 | } |
| 533 | return joinSuspend() // slow-path wait |
| 534 | } |
| 535 | |
| 536 | private fun joinInternal(): Boolean { |
| 537 | loopOnState { state -> |
| 538 | if (state !is Incomplete) return false // not active anymore (complete) -- no need to wait |
| 539 | if (startInternal(state) >= 0) return true // wait unless need to retry |
| 540 | } |
| 541 | } |
| 542 | |
| 543 | private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont -> |
Vsevolod Tolstopyatov | f3a5013 | 2018-04-16 19:41:20 +0300 | [diff] [blame] | 544 | // We have to invoke join() handler only on cancellation, on completion we will be resumed regularly without handlers |
| 545 | cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(this, cont).asHandler)) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 546 | } |
| 547 | |
| 548 | public final override val onJoin: SelectClause0 |
| 549 | get() = this |
| 550 | |
| 551 | // registerSelectJoin |
| 552 | public final override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) { |
| 553 | // fast-path -- check state and select/return if needed |
| 554 | loopOnState { state -> |
| 555 | if (select.isSelected) return |
| 556 | if (state !is Incomplete) { |
| 557 | // already complete -- select result |
| 558 | if (select.trySelect(null)) { |
| 559 | select.completion.context.checkCompletion() // always check for our completion |
Roman Elizarov | 7587eba | 2018-07-25 12:22:46 +0300 | [diff] [blame] | 560 | block.startCoroutineUnintercepted(select.completion) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 561 | } |
| 562 | return |
| 563 | } |
| 564 | if (startInternal(state) == 0) { |
| 565 | // slow-path -- register waiter for completion |
| 566 | select.disposeOnSelect(invokeOnCompletion(handler = SelectJoinOnCompletion(this, select, block).asHandler)) |
| 567 | return |
| 568 | } |
| 569 | } |
| 570 | } |
| 571 | |
| 572 | /** |
| 573 | * @suppress **This is unstable API and it is subject to change.** |
| 574 | */ |
| 575 | internal fun removeNode(node: JobNode<*>) { |
| 576 | // remove logic depends on the state of the job |
| 577 | loopOnState { state -> |
| 578 | when (state) { |
| 579 | is JobNode<*> -> { // SINGE/SINGLE+ state -- one completion handler |
| 580 | if (state !== node) return // a different job node --> we were already removed |
| 581 | // try remove and revert back to empty state |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 582 | if (_state.compareAndSet(state, EMPTY_ACTIVE)) return |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 583 | } |
| 584 | is Incomplete -> { // may have a list of completion handlers |
| 585 | // remove node from the list if there is a list |
| 586 | if (state.list != null) node.remove() |
| 587 | return |
| 588 | } |
| 589 | else -> return // it is complete and does not have any completion handlers |
| 590 | } |
| 591 | } |
| 592 | } |
| 593 | |
| 594 | /** |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 595 | * Returns `true` for job that do not have "body block" to complete and should immediately go into |
| 596 | * completing state and start waiting for children. |
| 597 | * |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 598 | * @suppress **This is unstable API and it is subject to change.** |
| 599 | */ |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 600 | internal open val onCancelComplete: Boolean get() = false |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 601 | |
Roman Elizarov | 1e0a2f0 | 2018-09-25 16:10:26 +0300 | [diff] [blame] | 602 | // external cancel without cause, never invoked implicitly from internal machinery |
| 603 | public override fun cancel(): Boolean = |
| 604 | cancel(null) // must delegate here, because some classes override cancel(x) |
Roman Elizarov | 27b8f45 | 2018-09-20 21:23:41 +0300 | [diff] [blame] | 605 | |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 606 | // external cancel with (optional) cause, never invoked implicitly from internal machinery |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 607 | public override fun cancel(cause: Throwable?): Boolean = |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 608 | cancelImpl(cause) && handlesException |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 609 | |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 610 | // parent is cancelling child |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 611 | public final override fun parentCancelled(parentJob: Job) { |
| 612 | cancelImpl(parentJob) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 613 | } |
| 614 | |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 615 | // child was cancelled with cause |
| 616 | internal fun childCancelled(cause: Throwable): Boolean = |
| 617 | cancelImpl(cause) && handlesException |
Roman Elizarov | f7a6334 | 2018-09-25 00:26:25 +0300 | [diff] [blame] | 618 | |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 619 | // cause is Throwable or Job when cancelChild was invoked |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 620 | // returns true is exception was handled, false otherwise |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 621 | private fun cancelImpl(cause: Any?): Boolean { |
| 622 | if (onCancelComplete) { |
| 623 | // make sure it is completing, if cancelMakeCompleting returns true it means it had make it |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 624 | // completing and had recorded exception |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 625 | if (cancelMakeCompleting(cause)) return true |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 626 | // otherwise just record exception via makeCancelling below |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 627 | } |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 628 | return makeCancelling(cause) |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 629 | } |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 630 | |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 631 | private fun cancelMakeCompleting(cause: Any?): Boolean { |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 632 | loopOnState { state -> |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 633 | if (state !is Incomplete || state is Finishing && state.isCompleting) { |
| 634 | return false // already completed/completing, do not even propose update |
| 635 | } |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 636 | val proposedUpdate = CompletedExceptionally(createCauseException(cause)) |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 637 | when (tryMakeCompleting(state, proposedUpdate, mode = MODE_ATOMIC_DEFAULT)) { |
| 638 | COMPLETING_ALREADY_COMPLETING -> return false |
| 639 | COMPLETING_COMPLETED, COMPLETING_WAITING_CHILDREN -> return true |
| 640 | COMPLETING_RETRY -> return@loopOnState |
| 641 | else -> error("unexpected result") |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 642 | } |
| 643 | } |
| 644 | } |
| 645 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 646 | private fun createJobCancellationException() = |
| 647 | JobCancellationException("Job was cancelled", null, this) |
| 648 | |
| 649 | // cause is Throwable or Job when cancelChild was invoked, cause can be null only on cancel |
| 650 | private fun createCauseException(cause: Any?): Throwable = when(cause) { |
| 651 | is Throwable? -> cause ?: createJobCancellationException() |
| 652 | else -> (cause as Job).getCancellationException() |
| 653 | } |
| 654 | |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 655 | // transitions to Cancelling state |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 656 | // cause is Throwable or Job when cancelChild was invoked, cause can be null only on cancel |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 657 | private fun makeCancelling(cause: Any?): Boolean { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 658 | var causeExceptionCache: Throwable? = null // lazily init result of createCauseException(cause) |
| 659 | loopOnState { state -> |
| 660 | when (state) { |
| 661 | is Finishing -> { // already finishing -- collect exceptions |
Vsevolod Tolstopyatov | f157cec | 2018-09-24 17:22:06 +0300 | [diff] [blame] | 662 | val notifyRootCause = synchronized(state) { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 663 | if (state.isSealed) return false // too late, already sealed -- cannot add exception nor mark cancelled |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 664 | // add exception, do nothing is parent is cancelling child that is already being cancelled |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 665 | val wasCancelling = state.isCancelling // will notify if was not cancelling |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 666 | // Materialize missing exception if it is the first exception (otherwise -- don't) |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 667 | if (cause != null || !wasCancelling) { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 668 | val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it } |
| 669 | state.addExceptionLocked(causeException) |
| 670 | } |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 671 | // take cause for notification is was not cancelling before |
| 672 | state.rootCause.takeIf { !wasCancelling } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 673 | } |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 674 | notifyRootCause?.let { notifyCancelling(state.list, it) } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 675 | return true |
| 676 | } |
| 677 | is Incomplete -> { |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 678 | // Not yet finishing -- try to make it cancelling |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 679 | val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it } |
| 680 | if (state.isActive) { |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 681 | // active state becomes cancelling |
| 682 | if (tryMakeCancelling(state, causeException)) return true |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 683 | } else { |
| 684 | // non active state starts completing |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 685 | when (tryMakeCompleting(state, CompletedExceptionally(causeException), mode = MODE_ATOMIC_DEFAULT)) { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 686 | COMPLETING_ALREADY_COMPLETING -> error("Cannot happen in $state") |
| 687 | COMPLETING_COMPLETED, COMPLETING_WAITING_CHILDREN -> return true // ok |
| 688 | COMPLETING_RETRY -> return@loopOnState |
| 689 | else -> error("unexpected result") |
| 690 | } |
| 691 | } |
| 692 | } |
| 693 | else -> return false // already complete |
| 694 | } |
| 695 | } |
| 696 | } |
| 697 | |
Roman Elizarov | f45ec8d | 2018-09-24 23:00:11 +0300 | [diff] [blame] | 698 | // Performs promotion of incomplete coroutine state to NodeList for the purpose of |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 699 | // converting coroutine state to Cancelling, returns null when need to retry |
| 700 | private fun getOrPromoteCancellingList(state: Incomplete): NodeList? = state.list ?: |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 701 | when (state) { |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 702 | is Empty -> NodeList() // we can allocate new empty list that'll get integrated into Cancelling state |
Roman Elizarov | f45ec8d | 2018-09-24 23:00:11 +0300 | [diff] [blame] | 703 | is JobNode<*> -> { |
| 704 | // SINGLE/SINGLE+ must be promoted to NodeList first, because otherwise we cannot |
| 705 | // correctly capture a reference to it |
| 706 | promoteSingleToNodeList(state) |
| 707 | null // retry |
| 708 | } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 709 | else -> error("State should have list: $state") |
| 710 | } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 711 | |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 712 | // try make new Cancelling state on the condition that we're still in the expected state |
| 713 | private fun tryMakeCancelling(state: Incomplete, rootCause: Throwable): Boolean { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 714 | check(state !is Finishing) // only for non-finishing states |
| 715 | check(state.isActive) // only for active states |
Roman Elizarov | f45ec8d | 2018-09-24 23:00:11 +0300 | [diff] [blame] | 716 | // get state's list or else promote to list to correctly operate on child lists |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 717 | val list = getOrPromoteCancellingList(state) ?: return false |
| 718 | // Create cancelling state (with rootCause!) |
| 719 | val cancelling = Finishing(list, false, rootCause) |
| 720 | if (!_state.compareAndSet(state, cancelling)) return false |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 721 | // Notify listeners |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 722 | notifyCancelling(list, rootCause) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 723 | return true |
| 724 | } |
| 725 | |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 726 | /** |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 727 | * This function is used by [CompletableDeferred.complete] (and exceptionally) and by [JobImpl.cancel]. |
| 728 | * It returns `false` on repeated invocation (when this job is already completing). |
| 729 | * |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 730 | * @suppress **This is unstable API and it is subject to change.** |
| 731 | */ |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 732 | internal fun makeCompleting(proposedUpdate: Any?): Boolean = loopOnState { state -> |
| 733 | when (tryMakeCompleting(state, proposedUpdate, mode = MODE_ATOMIC_DEFAULT)) { |
| 734 | COMPLETING_ALREADY_COMPLETING -> return false |
| 735 | COMPLETING_COMPLETED, COMPLETING_WAITING_CHILDREN -> return true |
| 736 | COMPLETING_RETRY -> return@loopOnState |
| 737 | else -> error("unexpected result") |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 738 | } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 739 | } |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 740 | |
| 741 | /** |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 742 | * This function is used by [AbstractCoroutine.resume]. |
| 743 | * It throws exception on repeated invocation (when this job is already completing). |
| 744 | * |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 745 | * Returns: |
| 746 | * * `true` if state was updated to completed/cancelled; |
| 747 | * * `false` if made completing or it is cancelling and is waiting for children. |
| 748 | * |
| 749 | * @throws IllegalStateException if job is already complete or completing |
| 750 | * @suppress **This is unstable API and it is subject to change.** |
| 751 | */ |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 752 | internal fun makeCompletingOnce(proposedUpdate: Any?, mode: Int): Boolean = loopOnState { state -> |
| 753 | when (tryMakeCompleting(state, proposedUpdate, mode)) { |
| 754 | COMPLETING_ALREADY_COMPLETING -> throw IllegalStateException("Job $this is already complete or completing, " + |
Roman Elizarov | 6d9f40f | 2018-04-28 14:44:02 +0300 | [diff] [blame] | 755 | "but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull) |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 756 | COMPLETING_COMPLETED -> return true |
| 757 | COMPLETING_WAITING_CHILDREN -> return false |
| 758 | COMPLETING_RETRY -> return@loopOnState |
| 759 | else -> error("unexpected result") |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 760 | } |
| 761 | } |
| 762 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 763 | private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?, mode: Int): Int { |
| 764 | if (state !is Incomplete) |
| 765 | return COMPLETING_ALREADY_COMPLETING |
Vsevolod Tolstopyatov | f157cec | 2018-09-24 17:22:06 +0300 | [diff] [blame] | 766 | /* |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 767 | * FAST PATH -- no children to wait for && simple state (no list) && not cancelling => can complete immediately |
| 768 | * Cancellation (failures) always have to go through Finishing state to serialize exception handling. |
Vsevolod Tolstopyatov | f157cec | 2018-09-24 17:22:06 +0300 | [diff] [blame] | 769 | * Otherwise, there can be a race between (completed state -> handled exception and newly attached child/join) |
| 770 | * which may miss unhandled exception. |
| 771 | */ |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 772 | if ((state is Empty || state is JobNode<*>) && state !is ChildHandleNode && proposedUpdate !is CompletedExceptionally) { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 773 | if (!tryFinalizeSimpleState(state, proposedUpdate, mode)) return COMPLETING_RETRY |
| 774 | return COMPLETING_COMPLETED |
| 775 | } |
| 776 | // get state's list or else promote to list to correctly operate on child lists |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 777 | val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 778 | // promote to Finishing state if we are not in it yet |
| 779 | // This promotion has to be atomic w.r.t to state change, so that a coroutine that is not active yet |
| 780 | // atomically transition to finishing & completing state |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 781 | val finishing = state as? Finishing ?: Finishing(list, false, null) |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 782 | // must synchronize updates to finishing state |
| 783 | var notifyRootCause: Throwable? = null |
| 784 | synchronized(finishing) { |
| 785 | // check if this state is already completing |
| 786 | if (finishing.isCompleting) return COMPLETING_ALREADY_COMPLETING |
| 787 | // mark as completing |
| 788 | finishing.isCompleting = true |
Roman Elizarov | ad84795 | 2018-09-24 23:20:18 +0300 | [diff] [blame] | 789 | // if we need to promote to finishing then atomically do it here. |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 790 | // We do it as early is possible while still holding the lock. This ensures that we cancelImpl asap |
Roman Elizarov | ad84795 | 2018-09-24 23:20:18 +0300 | [diff] [blame] | 791 | // (if somebody else is faster) and we synchronize all the threads on this finishing lock asap. |
| 792 | if (finishing !== state) { |
| 793 | if (!_state.compareAndSet(state, finishing)) return COMPLETING_RETRY |
| 794 | } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 795 | // ## IMPORTANT INVARIANT: Only one thread (that had set isCompleting) can go past this point |
| 796 | require(!finishing.isSealed) // cannot be sealed |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 797 | // add new proposed exception to the finishing state |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 798 | val wasCancelling = finishing.isCancelling |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 799 | (proposedUpdate as? CompletedExceptionally)?.let { finishing.addExceptionLocked(it.cause) } |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 800 | // If it just becomes cancelling --> must process cancelling notifications |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 801 | notifyRootCause = finishing.rootCause.takeIf { !wasCancelling } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 802 | } |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 803 | // process cancelling notification here -- it cancels all the children _before_ we start to to wait them (sic!!!) |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 804 | notifyRootCause?.let { notifyCancelling(list, it) } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 805 | // now wait for children |
Roman Elizarov | 6634ed7 | 2018-09-20 15:13:06 +0300 | [diff] [blame] | 806 | val child = firstChild(state) |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 807 | if (child != null && tryWaitForChild(finishing, child, proposedUpdate)) |
| 808 | return COMPLETING_WAITING_CHILDREN |
| 809 | // otherwise -- we have not children left (all were already cancelled?) |
| 810 | if (tryFinalizeFinishingState(finishing, proposedUpdate, mode)) |
| 811 | return COMPLETING_COMPLETED |
| 812 | // otherwise retry |
| 813 | return COMPLETING_RETRY |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 814 | } |
| 815 | |
| 816 | private val Any?.exceptionOrNull: Throwable? |
Roman Elizarov | 6d9f40f | 2018-04-28 14:44:02 +0300 | [diff] [blame] | 817 | get() = (this as? CompletedExceptionally)?.cause |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 818 | |
| 819 | private fun firstChild(state: Incomplete) = |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 820 | state as? ChildHandleNode ?: state.list?.nextChild() |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 821 | |
| 822 | // return false when there is no more incomplete children to wait |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 823 | // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method. |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 824 | private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleNode, proposedUpdate: Any?): Boolean { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 825 | val handle = child.childJob.invokeOnCompletion( |
| 826 | invokeImmediately = false, |
| 827 | handler = ChildCompletion(this, state, child, proposedUpdate).asHandler |
| 828 | ) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 829 | if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it |
| 830 | val nextChild = child.nextChild() ?: return false |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 831 | return tryWaitForChild(state, nextChild, proposedUpdate) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 832 | } |
| 833 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 834 | // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method. |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 835 | private fun continueCompleting(state: Finishing, lastChild: ChildHandleNode, proposedUpdate: Any?) { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 836 | require(this.state === state) // consistency check -- it cannot change while we are waiting for children |
| 837 | // figure out if we need to wait for next child |
| 838 | val waitChild = lastChild.nextChild() |
| 839 | // try wait for next child |
| 840 | if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child |
| 841 | // no more children to wait -- try update state |
| 842 | if (tryFinalizeFinishingState(state, proposedUpdate, MODE_ATOMIC_DEFAULT)) return |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 843 | } |
| 844 | |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 845 | private fun LockFreeLinkedListNode.nextChild(): ChildHandleNode? { |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 846 | var cur = this |
| 847 | while (cur.isRemoved) cur = cur.prevNode // rollback to prev non-removed (or list head) |
| 848 | while (true) { |
| 849 | cur = cur.nextNode |
| 850 | if (cur.isRemoved) continue |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 851 | if (cur is ChildHandleNode) return cur |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 852 | if (cur is NodeList) return null // checked all -- no more children |
| 853 | } |
| 854 | } |
| 855 | |
| 856 | public final override val children: Sequence<Job> get() = buildSequence { |
| 857 | val state = this@JobSupport.state |
| 858 | when (state) { |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 859 | is ChildHandleNode -> yield(state.childJob) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 860 | is Incomplete -> state.list?.let { list -> |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 861 | list.forEach<ChildHandleNode> { yield(it.childJob) } |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 862 | } |
| 863 | } |
| 864 | } |
| 865 | |
| 866 | @Suppress("OverridingDeprecatedMember") |
Roman Elizarov | f7a6334 | 2018-09-25 00:26:25 +0300 | [diff] [blame] | 867 | public final override fun attachChild(child: ChildJob): ChildHandle { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 868 | /* |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 869 | * Note: This function attaches a special ChildHandleNode node object. This node object |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 870 | * is handled in a special way on completion on the coroutine (we wait for all of them) and |
| 871 | * is handled specially by invokeOnCompletion itself -- it adds this node to the list even |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 872 | * if the job is already cancelling. For cancelling state child is attached under state lock. |
Vsevolod Tolstopyatov | 409ed26 | 2018-09-24 17:48:20 +0300 | [diff] [blame] | 873 | * It's required to properly wait all children before completion and provide linearizable hierarchy view: |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 874 | * If child is attached when job is already being cancelled, such child will receive immediate notification on |
| 875 | * cancellation, but parent *will* wait for that child before completion and will handle its exception. |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 876 | */ |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 877 | return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(this, child).asHandler) as ChildHandle |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 878 | } |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 879 | |
| 880 | @Suppress("OverridingDeprecatedMember") |
| 881 | public final override fun cancelChildren(cause: Throwable?) { |
| 882 | this.cancelChildren(cause) // use extension function |
| 883 | } |
| 884 | |
| 885 | /** |
| 886 | * Override to process any exceptions that were encountered while invoking completion handlers |
| 887 | * installed via [invokeOnCompletion]. |
| 888 | * @suppress **This is unstable API and it is subject to change.** |
| 889 | */ |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 890 | internal open fun handleOnCompletionException(exception: Throwable) { |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 891 | throw exception |
| 892 | } |
| 893 | |
| 894 | /** |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 895 | * This function is invoked once when job is being cancelled, fails, or is completed. |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 896 | * It's an optimization for [invokeOnCompletion] with `onCancellation` set to `true`. |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 897 | * |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 898 | * @suppress **This is unstable API and it is subject to change.* |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 899 | */ |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 900 | protected open fun onCancellation(cause: Throwable?) {} |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 901 | |
Roman Elizarov | 5d18d02 | 2018-09-22 22:13:05 +0300 | [diff] [blame] | 902 | /** |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 903 | * When this function returns `true` the parent is cancelled on cancellation of this job. |
| 904 | * Note, that [CancellationException] is considered "normal" and parent is not cancelled when child produces it. |
| 905 | * This allows parent to cancel its children (normally) without being cancelled itself, unless |
| 906 | * child crashes and produce some other exception during its completion. |
Roman Elizarov | 5d18d02 | 2018-09-22 22:13:05 +0300 | [diff] [blame] | 907 | * |
| 908 | * @suppress **This is unstable API and it is subject to change.* |
| 909 | */ |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 910 | protected open val cancelsParent: Boolean get() = false |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 911 | |
| 912 | /** |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 913 | * Returns `true` for jobs that handle their exceptions via [handleJobException] or integrate them |
| 914 | * into the job's result via [onCompletionInternal]. The only instance of the [Job] that does not |
| 915 | * handle its exceptions is [JobImpl]. |
| 916 | * |
| 917 | * @suppress **This is unstable API and it is subject to change.* |
Roman Elizarov | 67912f9 | 2018-09-16 01:46:43 +0300 | [diff] [blame] | 918 | */ |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 919 | protected open val handlesException: Boolean get() = true |
Roman Elizarov | 67912f9 | 2018-09-16 01:46:43 +0300 | [diff] [blame] | 920 | |
| 921 | /** |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 922 | * This method is invoked **exactly once** when the final exception of the job is determined |
| 923 | * and before it becomes complete. At the moment of invocation the job and all its children are complete. |
| 924 | * |
| 925 | * @suppress **This is unstable API and it is subject to change.* |
Roman Elizarov | 67912f9 | 2018-09-16 01:46:43 +0300 | [diff] [blame] | 926 | */ |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 927 | protected open fun handleJobException(exception: Throwable) {} |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 928 | |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 929 | private fun cancelParent(cause: Throwable): Boolean { |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 930 | // CancellationException is considered "normal" and parent is not cancelled when child produces it. |
| 931 | // This allow parent to cancel its children (normally) without being cancelled itself, unless |
| 932 | // child crashes and produce some other exception during its completion. |
Roman Elizarov | 5d18d02 | 2018-09-22 22:13:05 +0300 | [diff] [blame] | 933 | if (cause is CancellationException) return true |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 934 | if (!cancelsParent) return false |
| 935 | return parentHandle?.childCancelled(cause) == true |
Roman Elizarov | 5d18d02 | 2018-09-22 22:13:05 +0300 | [diff] [blame] | 936 | } |
| 937 | |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 938 | /** |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 939 | * Override for post-completion actions that need to do something with the state. |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 940 | * @param state the final state. |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 941 | * @param mode completion mode. |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 942 | * @param suppressed true when any exceptions were suppressed while building the final completion cause. |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 943 | * @suppress **This is unstable API and it is subject to change.** |
| 944 | */ |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 945 | internal open fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {} |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 946 | |
| 947 | // for nicer debugging |
| 948 | public override fun toString(): String = |
| 949 | "${nameString()}{${stateString()}}@$hexAddress" |
| 950 | |
| 951 | /** |
| 952 | * @suppress **This is unstable API and it is subject to change.** |
| 953 | */ |
| 954 | internal open fun nameString(): String = classSimpleName |
| 955 | |
| 956 | private fun stateString(): String { |
| 957 | val state = this.state |
| 958 | return when (state) { |
| 959 | is Finishing -> buildString { |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 960 | when { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 961 | state.isCancelling -> append("Cancelling") |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 962 | else -> append("Active") |
| 963 | } |
| 964 | if (state.isCompleting) append("Completing") |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 965 | } |
| 966 | is Incomplete -> if (state.isActive) "Active" else "New" |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 967 | is CompletedExceptionally -> "Cancelled" |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 968 | else -> "Completed" |
| 969 | } |
| 970 | } |
| 971 | |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 972 | // Completing & Cancelling states, |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 973 | // All updates are guarded by synchronized(this), reads are volatile |
Vsevolod Tolstopyatov | c9afb67 | 2018-07-24 20:30:48 +0300 | [diff] [blame] | 974 | @Suppress("UNCHECKED_CAST") |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 975 | private class Finishing( |
| 976 | override val list: NodeList, |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 977 | @Volatile |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 978 | @JvmField var isCompleting: Boolean, |
| 979 | @Volatile |
| 980 | @JvmField var rootCause: Throwable? // NOTE: rootCause is kept even when SEALED |
Roman Elizarov | 4e33cc6 | 2018-09-05 01:15:44 +0300 | [diff] [blame] | 981 | ) : SynchronizedObject(), Incomplete { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 982 | @Volatile |
| 983 | private var _exceptionsHolder: Any? = null // Contains null | Throwable | ArrayList | SEALED |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 984 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 985 | // NotE: cannot be modified when sealed |
| 986 | val isSealed: Boolean get() = _exceptionsHolder === SEALED |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 987 | val isCancelling: Boolean get() = rootCause != null |
| 988 | override val isActive: Boolean get() = rootCause == null // !isCancelling |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 989 | |
| 990 | // Seals current state and returns list of exceptions |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 991 | // guarded by `synchronized(this)` |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 992 | fun sealLocked(proposedException: Throwable?): List<Throwable> { |
| 993 | val eh = _exceptionsHolder // volatile read |
| 994 | val list = when(eh) { |
| 995 | null -> allocateList() |
| 996 | is Throwable -> allocateList().also { it.add(eh) } |
| 997 | is ArrayList<*> -> eh as ArrayList<Throwable> |
| 998 | else -> error("State is $eh") // already sealed -- cannot happen |
| 999 | } |
| 1000 | val rootCause = this.rootCause // volatile read |
| 1001 | rootCause?.let { list.add(0, it) } // note -- rootCause goes to the beginning |
| 1002 | if (proposedException != null && proposedException != rootCause) list.add(proposedException) |
| 1003 | _exceptionsHolder = SEALED |
| 1004 | return list |
Vsevolod Tolstopyatov | c9afb67 | 2018-07-24 20:30:48 +0300 | [diff] [blame] | 1005 | } |
| 1006 | |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 1007 | // guarded by `synchronized(this)` |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1008 | fun addExceptionLocked(exception: Throwable) { |
| 1009 | val rootCause = this.rootCause // volatile read |
| 1010 | if (rootCause == null) { |
| 1011 | this.rootCause = exception |
| 1012 | return |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 1013 | } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1014 | if (exception === rootCause) return // nothing to do |
| 1015 | val eh = _exceptionsHolder // volatile read |
| 1016 | when (eh) { |
| 1017 | null -> _exceptionsHolder = exception |
Vsevolod Tolstopyatov | c9afb67 | 2018-07-24 20:30:48 +0300 | [diff] [blame] | 1018 | is Throwable -> { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1019 | if (exception === eh) return // nothing to do |
| 1020 | _exceptionsHolder = allocateList().apply { |
| 1021 | add(eh) |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 1022 | add(exception) |
| 1023 | |
| 1024 | } |
Vsevolod Tolstopyatov | c9afb67 | 2018-07-24 20:30:48 +0300 | [diff] [blame] | 1025 | } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1026 | is ArrayList<*> -> (eh as ArrayList<Throwable>).add(exception) |
| 1027 | else -> error("State is $eh") // already sealed -- cannot happen |
Vsevolod Tolstopyatov | 91ecee8 | 2018-08-07 18:24:00 +0300 | [diff] [blame] | 1028 | } |
| 1029 | } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1030 | |
| 1031 | private fun allocateList() = ArrayList<Throwable>(4) |
| 1032 | |
| 1033 | override fun toString(): String = |
| 1034 | "Finishing[cancelling=$isCancelling, completing=$isCompleting, rootCause=$rootCause, exceptions=$_exceptionsHolder, list=$list]" |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1035 | } |
| 1036 | |
| 1037 | private val Incomplete.isCancelling: Boolean |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1038 | get() = this is Finishing && isCancelling |
| 1039 | |
| 1040 | // Used by parent that is waiting for child completion |
| 1041 | private class ChildCompletion( |
| 1042 | private val parent: JobSupport, |
| 1043 | private val state: Finishing, |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 1044 | private val child: ChildHandleNode, |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1045 | private val proposedUpdate: Any? |
| 1046 | ) : JobNode<Job>(child.childJob) { |
| 1047 | override fun invoke(cause: Throwable?) { |
| 1048 | parent.continueCompleting(state, child, proposedUpdate) |
| 1049 | } |
| 1050 | override fun toString(): String = |
| 1051 | "ChildCompletion[$child, $proposedUpdate]" |
| 1052 | } |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1053 | |
| 1054 | /* |
| 1055 | * ================================================================================================= |
| 1056 | * This is ready-to-use implementation for Deferred interface. |
| 1057 | * However, it is not type-safe. Conceptually it just exposes the value of the underlying |
| 1058 | * completed state as `Any?` |
| 1059 | * ================================================================================================= |
| 1060 | */ |
| 1061 | |
| 1062 | public val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally |
| 1063 | |
| 1064 | public fun getCompletionExceptionOrNull(): Throwable? { |
| 1065 | val state = this.state |
| 1066 | check(state !is Incomplete) { "This job has not completed yet" } |
| 1067 | return state.exceptionOrNull |
| 1068 | } |
| 1069 | |
| 1070 | /** |
| 1071 | * @suppress **This is unstable API and it is subject to change.** |
| 1072 | */ |
| 1073 | internal fun getCompletedInternal(): Any? { |
| 1074 | val state = this.state |
| 1075 | check(state !is Incomplete) { "This job has not completed yet" } |
Roman Elizarov | 6d9f40f | 2018-04-28 14:44:02 +0300 | [diff] [blame] | 1076 | if (state is CompletedExceptionally) throw state.cause |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1077 | return state |
| 1078 | } |
| 1079 | |
| 1080 | /** |
| 1081 | * @suppress **This is unstable API and it is subject to change.** |
| 1082 | */ |
| 1083 | internal suspend fun awaitInternal(): Any? { |
| 1084 | // fast-path -- check state (avoid extra object creation) |
| 1085 | while(true) { // lock-free loop on state |
| 1086 | val state = this.state |
| 1087 | if (state !is Incomplete) { |
| 1088 | // already complete -- just return result |
Roman Elizarov | 6d9f40f | 2018-04-28 14:44:02 +0300 | [diff] [blame] | 1089 | if (state is CompletedExceptionally) throw state.cause |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1090 | return state |
| 1091 | |
| 1092 | } |
| 1093 | if (startInternal(state) >= 0) break // break unless needs to retry |
| 1094 | } |
| 1095 | return awaitSuspend() // slow-path |
| 1096 | } |
| 1097 | |
| 1098 | private suspend fun awaitSuspend(): Any? = suspendCancellableCoroutine { cont -> |
Vsevolod Tolstopyatov | f6430f4 | 2018-04-17 17:56:32 +0300 | [diff] [blame] | 1099 | // We have to invoke await() handler only on cancellation, on completion we will be resumed regularly without handlers |
Vsevolod Tolstopyatov | f3a5013 | 2018-04-16 19:41:20 +0300 | [diff] [blame] | 1100 | cont.disposeOnCancellation(invokeOnCompletion { |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1101 | val state = this.state |
| 1102 | check(state !is Incomplete) |
| 1103 | if (state is CompletedExceptionally) |
Roman Elizarov | 6d9f40f | 2018-04-28 14:44:02 +0300 | [diff] [blame] | 1104 | cont.resumeWithException(state.cause) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1105 | else |
| 1106 | cont.resume(state) |
| 1107 | }) |
| 1108 | } |
| 1109 | |
| 1110 | /** |
| 1111 | * @suppress **This is unstable API and it is subject to change.** |
| 1112 | */ |
| 1113 | // registerSelectAwaitInternal |
| 1114 | @Suppress("UNCHECKED_CAST") |
| 1115 | internal fun <T, R> registerSelectClause1Internal(select: SelectInstance<R>, block: suspend (T) -> R) { |
| 1116 | // fast-path -- check state and select/return if needed |
| 1117 | loopOnState { state -> |
| 1118 | if (select.isSelected) return |
| 1119 | if (state !is Incomplete) { |
| 1120 | // already complete -- select result |
| 1121 | if (select.trySelect(null)) { |
| 1122 | if (state is CompletedExceptionally) |
Roman Elizarov | 6d9f40f | 2018-04-28 14:44:02 +0300 | [diff] [blame] | 1123 | select.resumeSelectCancellableWithException(state.cause) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1124 | else |
Roman Elizarov | 7587eba | 2018-07-25 12:22:46 +0300 | [diff] [blame] | 1125 | block.startCoroutineUnintercepted(state as T, select.completion) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1126 | } |
| 1127 | return |
| 1128 | } |
| 1129 | if (startInternal(state) == 0) { |
| 1130 | // slow-path -- register waiter for completion |
| 1131 | select.disposeOnSelect(invokeOnCompletion(handler = SelectAwaitOnCompletion(this, select, block).asHandler)) |
| 1132 | return |
| 1133 | } |
| 1134 | } |
| 1135 | } |
| 1136 | |
| 1137 | /** |
| 1138 | * @suppress **This is unstable API and it is subject to change.** |
| 1139 | */ |
| 1140 | @Suppress("UNCHECKED_CAST") |
| 1141 | internal fun <T, R> selectAwaitCompletion(select: SelectInstance<R>, block: suspend (T) -> R) { |
| 1142 | val state = this.state |
| 1143 | // Note: await is non-atomic (can be cancelled while dispatched) |
| 1144 | if (state is CompletedExceptionally) |
Roman Elizarov | 6d9f40f | 2018-04-28 14:44:02 +0300 | [diff] [blame] | 1145 | select.resumeSelectCancellableWithException(state.cause) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1146 | else |
| 1147 | block.startCoroutineCancellable(state as T, select.completion) |
| 1148 | } |
| 1149 | } |
| 1150 | |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 1151 | // --------------- helper classes & constants for job implementation |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1152 | |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1153 | private const val COMPLETING_ALREADY_COMPLETING = 0 |
| 1154 | private const val COMPLETING_COMPLETED = 1 |
| 1155 | private const val COMPLETING_WAITING_CHILDREN = 2 |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1156 | private const val COMPLETING_RETRY = 3 |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1157 | |
| 1158 | private const val RETRY = -1 |
| 1159 | private const val FALSE = 0 |
| 1160 | private const val TRUE = 1 |
| 1161 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1162 | private val SEALED = Symbol("SEALED") |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1163 | |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 1164 | private val EMPTY_NEW = Empty(false) |
| 1165 | private val EMPTY_ACTIVE = Empty(true) |
Vsevolod Tolstopyatov | c9afb67 | 2018-07-24 20:30:48 +0300 | [diff] [blame] | 1166 | |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1167 | private class Empty(override val isActive: Boolean) : Incomplete { |
| 1168 | override val list: NodeList? get() = null |
| 1169 | override fun toString(): String = "Empty{${if (isActive) "Active" else "New" }}" |
| 1170 | } |
| 1171 | |
| 1172 | internal class JobImpl(parent: Job? = null) : JobSupport(true) { |
| 1173 | init { initParentJobInternal(parent) } |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 1174 | override val onCancelComplete get() = true |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1175 | override val handlesException: Boolean get() = false |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1176 | } |
| 1177 | |
| 1178 | // -------- invokeOnCompletion nodes |
| 1179 | |
| 1180 | internal interface Incomplete { |
| 1181 | val isActive: Boolean |
| 1182 | val list: NodeList? // is null only for Empty and JobNode incomplete state objects |
| 1183 | } |
| 1184 | |
Roman Elizarov | 6d9f40f | 2018-04-28 14:44:02 +0300 | [diff] [blame] | 1185 | internal abstract class JobNode<out J : Job>( |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1186 | @JvmField val job: J |
Roman Elizarov | dbd9e1c | 2018-04-28 15:14:18 +0300 | [diff] [blame] | 1187 | ) : CompletionHandlerBase(), DisposableHandle, Incomplete { |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1188 | override val isActive: Boolean get() = true |
| 1189 | override val list: NodeList? get() = null |
| 1190 | override fun dispose() = (job as JobSupport).removeNode(this) |
| 1191 | } |
| 1192 | |
Roman Elizarov | ede2923 | 2018-09-18 12:53:09 +0300 | [diff] [blame] | 1193 | internal class NodeList : LockFreeLinkedListHead(), Incomplete { |
| 1194 | override val isActive: Boolean get() = true |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1195 | override val list: NodeList get() = this |
| 1196 | |
Roman Elizarov | ede2923 | 2018-09-18 12:53:09 +0300 | [diff] [blame] | 1197 | fun getString(state: String) = buildString { |
| 1198 | append("List{") |
| 1199 | append(state) |
| 1200 | append("}[") |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1201 | var first = true |
| 1202 | this@NodeList.forEach<JobNode<*>> { node -> |
| 1203 | if (first) first = false else append(", ") |
| 1204 | append(node) |
| 1205 | } |
| 1206 | append("]") |
| 1207 | } |
Roman Elizarov | ede2923 | 2018-09-18 12:53:09 +0300 | [diff] [blame] | 1208 | |
| 1209 | override fun toString(): String = getString("Active") |
| 1210 | } |
| 1211 | |
| 1212 | internal class InactiveNodeList( |
| 1213 | override val list: NodeList |
| 1214 | ) : Incomplete { |
| 1215 | override val isActive: Boolean get() = false |
| 1216 | override fun toString(): String = list.getString("New") |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1217 | } |
| 1218 | |
| 1219 | private class InvokeOnCompletion( |
| 1220 | job: Job, |
| 1221 | private val handler: CompletionHandler |
| 1222 | ) : JobNode<Job>(job) { |
| 1223 | override fun invoke(cause: Throwable?) = handler.invoke(cause) |
| 1224 | override fun toString() = "InvokeOnCompletion[$classSimpleName@$hexAddress]" |
| 1225 | } |
| 1226 | |
| 1227 | private class ResumeOnCompletion( |
| 1228 | job: Job, |
| 1229 | private val continuation: Continuation<Unit> |
| 1230 | ) : JobNode<Job>(job) { |
| 1231 | override fun invoke(cause: Throwable?) = continuation.resume(Unit) |
| 1232 | override fun toString() = "ResumeOnCompletion[$continuation]" |
| 1233 | } |
| 1234 | |
| 1235 | internal class DisposeOnCompletion( |
| 1236 | job: Job, |
| 1237 | private val handle: DisposableHandle |
| 1238 | ) : JobNode<Job>(job) { |
| 1239 | override fun invoke(cause: Throwable?) = handle.dispose() |
| 1240 | override fun toString(): String = "DisposeOnCompletion[$handle]" |
| 1241 | } |
| 1242 | |
| 1243 | private class SelectJoinOnCompletion<R>( |
| 1244 | job: JobSupport, |
| 1245 | private val select: SelectInstance<R>, |
| 1246 | private val block: suspend () -> R |
| 1247 | ) : JobNode<JobSupport>(job) { |
| 1248 | override fun invoke(cause: Throwable?) { |
| 1249 | if (select.trySelect(null)) |
| 1250 | block.startCoroutineCancellable(select.completion) |
| 1251 | } |
| 1252 | override fun toString(): String = "SelectJoinOnCompletion[$select]" |
| 1253 | } |
| 1254 | |
| 1255 | private class SelectAwaitOnCompletion<T, R>( |
| 1256 | job: JobSupport, |
| 1257 | private val select: SelectInstance<R>, |
| 1258 | private val block: suspend (T) -> R |
| 1259 | ) : JobNode<JobSupport>(job) { |
| 1260 | override fun invoke(cause: Throwable?) { |
| 1261 | if (select.trySelect(null)) |
| 1262 | job.selectAwaitCompletion(select, block) |
| 1263 | } |
| 1264 | override fun toString(): String = "SelectAwaitOnCompletion[$select]" |
| 1265 | } |
| 1266 | |
| 1267 | // -------- invokeOnCancellation nodes |
| 1268 | |
| 1269 | /** |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 1270 | * Marker for node that shall be invoked on in _cancelling_ state. |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1271 | * **Note: may be invoked multiple times.** |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1272 | */ |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 1273 | internal abstract class JobCancellingNode<out J : Job>(job: J) : JobNode<J>(job) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1274 | |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 1275 | private class InvokeOnCancelling( |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1276 | job: Job, |
| 1277 | private val handler: CompletionHandler |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 1278 | ) : JobCancellingNode<Job>(job) { |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1279 | // delegate handler shall be invoked at most once, so here is an additional flag |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1280 | private val _invoked = atomic(0) // todo: replace with atomic boolean after migration to recent atomicFu |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1281 | override fun invoke(cause: Throwable?) { |
| 1282 | if (_invoked.compareAndSet(0, 1)) handler.invoke(cause) |
| 1283 | } |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 1284 | override fun toString() = "InvokeOnCancelling[$classSimpleName@$hexAddress]" |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1285 | } |
| 1286 | |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 1287 | internal class ChildHandleNode( |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1288 | parent: JobSupport, |
Roman Elizarov | f7a6334 | 2018-09-25 00:26:25 +0300 | [diff] [blame] | 1289 | @JvmField val childJob: ChildJob |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 1290 | ) : JobCancellingNode<JobSupport>(parent), ChildHandle { |
| 1291 | override fun invoke(cause: Throwable?) = childJob.parentCancelled(job) |
| 1292 | override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause) |
Roman Elizarov | f7a6334 | 2018-09-25 00:26:25 +0300 | [diff] [blame] | 1293 | override fun toString(): String = "ChildHandle[$childJob]" |
Vsevolod Tolstopyatov | f3a5013 | 2018-04-16 19:41:20 +0300 | [diff] [blame] | 1294 | } |
| 1295 | |
Roman Elizarov | 852fae5 | 2018-09-25 20:13:24 +0300 | [diff] [blame] | 1296 | // Same as ChildHandleNode, but for cancellable continuation |
Vsevolod Tolstopyatov | f3a5013 | 2018-04-16 19:41:20 +0300 | [diff] [blame] | 1297 | internal class ChildContinuation( |
| 1298 | parent: Job, |
| 1299 | @JvmField val child: AbstractContinuation<*> |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 1300 | ) : JobCancellingNode<Job>(parent) { |
Vsevolod Tolstopyatov | f3a5013 | 2018-04-16 19:41:20 +0300 | [diff] [blame] | 1301 | override fun invoke(cause: Throwable?) { |
| 1302 | child.cancel(job.getCancellationException()) |
| 1303 | } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1304 | override fun toString(): String = |
| 1305 | "ChildContinuation[$child]" |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1306 | } |
| 1307 | |