Roman Elizarov | 1f74a2d | 2018-06-29 19:19:45 +0300 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
| 3 | */ |
| 4 | |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 5 | package kotlinx.coroutines.experimental |
| 6 | |
| 7 | import kotlinx.atomicfu.* |
| 8 | import kotlinx.coroutines.experimental.internal.* |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 9 | import kotlinx.coroutines.experimental.intrinsics.* |
| 10 | import kotlinx.coroutines.experimental.selects.* |
| 11 | import kotlin.coroutines.experimental.* |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 12 | |
| 13 | /** |
| 14 | * A concrete implementation of [Job]. It is optionally a child to a parent job. |
| 15 | * This job is cancelled when the parent is complete, but not vise-versa. |
| 16 | * |
| 17 | * This is an open class designed for extension by more specific classes that might augment the |
| 18 | * state and mare store addition state information for completed jobs, like their result values. |
| 19 | * |
| 20 | * @param active when `true` the job is created in _active_ state, when `false` in _new_ state. See [Job] for details. |
| 21 | * @suppress **This is unstable API and it is subject to change.** |
| 22 | */ |
| 23 | internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0 { |
| 24 | final override val key: CoroutineContext.Key<*> get() = Job |
| 25 | |
| 26 | /* |
| 27 | === Internal states === |
| 28 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 29 | name state class public state description |
| 30 | ------ ------------ ------------ ----------- |
| 31 | EMPTY_N EmptyNew : New no listeners |
| 32 | EMPTY_A EmptyActive : Active no listeners |
| 33 | SINGLE JobNode : Active a single listener |
| 34 | SINGLE+ JobNode : Active a single listener + NodeList added as its next |
| 35 | LIST_N InactiveNodeList : New a list of listeners (promoted once, does not got back to EmptyNew) |
| 36 | LIST_A NodeList : Active a list of listeners (promoted once, does not got back to JobNode/EmptyActive) |
| 37 | COMPLETING Finishing : Completing has a list of listeners (promoted once from LIST_*) |
| 38 | CANCELLING Finishing : Cancelling -- " -- |
| 39 | FINAL_C Cancelled : Cancelled cancelled (final state) |
| 40 | FINAL_F CompletedExceptionally : Completed failed for other reason (final state) |
| 41 | FINAL_R <any> : Completed produced some result |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 42 | |
| 43 | === Transitions === |
| 44 | |
| 45 | New states Active states Inactive states |
| 46 | |
| 47 | +---------+ +---------+ } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 48 | | EMPTY_N | ----> | EMPTY_A | ----+ } Empty states |
| 49 | +---------+ +---------+ | } |
| 50 | | | | ^ | +----------+ |
| 51 | | | | | +--> | FINAL_* | |
| 52 | | | V | | +----------+ |
| 53 | | | +---------+ | } |
| 54 | | | | SINGLE | ----+ } JobNode states |
| 55 | | | +---------+ | } |
| 56 | | | | | } |
| 57 | | | V | } |
| 58 | | | +---------+ | } |
| 59 | | +-------> | SINGLE+ | ----+ } |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 60 | | +---------+ | } |
| 61 | | | | |
| 62 | V V | |
| 63 | +---------+ +---------+ | } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 64 | | LIST_N | ----> | LIST_A | ----+ } [Inactive]NodeList states |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 65 | +---------+ +---------+ | } |
| 66 | | | | | | |
| 67 | | | +--------+ | | |
| 68 | | | | V | |
| 69 | | | | +------------+ | +------------+ } |
| 70 | | +-------> | COMPLETING | --+-- | CANCELLING | } Finishing states |
| 71 | | | +------------+ +------------+ } |
| 72 | | | | ^ |
| 73 | | | | | |
| 74 | +--------+---------+--------------------+ |
| 75 | |
| 76 | |
| 77 | This state machine and its transition matrix are optimized for the common case when job is created in active |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 78 | state (EMPTY_A), at most one completion listener is added to it during its life-time, and it completes |
| 79 | successfully without children (in this case it directly goes from EMPTY_A or SINGLE state to FINAL_R |
| 80 | state without going to COMPLETING state) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 81 | |
| 82 | Note, that the actual `_state` variable can also be a reference to atomic operation descriptor `OpDescriptor` |
Roman Elizarov | ff0aab8 | 2018-09-22 17:11:53 +0300 | [diff] [blame] | 83 | |
| 84 | ---------- TIMELINE of state changes and notification in Job lifecycle ---------- |
| 85 | |
| 86 | | The longest possible chain of events in shown, shorter versions cut-through intermediate states, |
| 87 | | while still performing all the notifications in this order. |
| 88 | |
| 89 | + Job object is created |
| 90 | ## NEW: state == EMPTY_ACTIVE | is InactiveNodeList |
Vsevolod Tolstopyatov | f157cec | 2018-09-24 17:22:06 +0300 | [diff] [blame] | 91 | + initParentJob / initParentJobInternal (invokes attachChild on its parent, initializes parentHandle) |
| 92 | ~ waits for start |
| 93 | >> start / join / await invoked |
Roman Elizarov | ff0aab8 | 2018-09-22 17:11:53 +0300 | [diff] [blame] | 94 | ## ACTIVE: state == EMPTY_ACTIVE | is JobNode | is NodeList |
Vsevolod Tolstopyatov | f157cec | 2018-09-24 17:22:06 +0300 | [diff] [blame] | 95 | + onStartInternal / onStart (lazy coroutine is started) |
| 96 | ~ active coroutine is working (or scheduled to execution) |
| 97 | >> childFailed / fail invoked |
Roman Elizarov | ff0aab8 | 2018-09-22 17:11:53 +0300 | [diff] [blame] | 98 | ## FAILING: state is Finishing, state.rootCause != null |
Vsevolod Tolstopyatov | f157cec | 2018-09-24 17:22:06 +0300 | [diff] [blame] | 99 | ------ failing listeners are not admitted anymore, invokeOnCompletion(onFailing=true) returns NonDisposableHandle |
| 100 | ------ new children get immediately cancelled, but are still admitted to the list |
| 101 | + onFailing |
| 102 | + notifyFailing (invoke all failing listeners -- cancel all children, suspended functions resume with exception) |
| 103 | + failParent (rootCause of failure is communicated to the parent, parent starts failing, too) |
| 104 | ~ waits for completion of coroutine body |
| 105 | >> makeCompleting / makeCompletingOnce invoked |
Roman Elizarov | ff0aab8 | 2018-09-22 17:11:53 +0300 | [diff] [blame] | 106 | ## COMPLETING: state is Finishing, state.isCompleting == true |
Vsevolod Tolstopyatov | f157cec | 2018-09-24 17:22:06 +0300 | [diff] [blame] | 107 | ------ new children are not admitted anymore, attachChild returns NonDisposableHandle |
| 108 | ~ waits for children |
| 109 | >> last child completes |
| 110 | - computes the final exception |
Roman Elizarov | ff0aab8 | 2018-09-22 17:11:53 +0300 | [diff] [blame] | 111 | ## SEALED: state is Finishing, state.isSealed == true |
Vsevolod Tolstopyatov | f157cec | 2018-09-24 17:22:06 +0300 | [diff] [blame] | 112 | ------ cancel/childFailed returns false (cannot handle exceptions anymore) |
| 113 | + failParent (final exception is communicated to the parent, parent incorporates it) |
| 114 | + handleJobException ("launch" StandaloneCoroutine invokes CoroutineExceptionHandler) |
| 115 | ## COMPLETE: state !is Incomplete (CompletedExceptionally | Cancelled) |
| 116 | ------ completion listeners are not admitted anymore, invokeOnCompletion returns NonDisposableHandle |
| 117 | + parentHandle.dispose |
| 118 | + notifyCompletion (invoke all completion listeners) |
| 119 | + onCompletionInternal / onCompleted / onCompletedExceptionally |
Roman Elizarov | ff0aab8 | 2018-09-22 17:11:53 +0300 | [diff] [blame] | 120 | |
| 121 | --------------------------------------------------------------------------------- |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 122 | */ |
| 123 | |
| 124 | // Note: use shared objects while we have no listeners |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 125 | private val _state = atomic<Any?>(if (active) EMPTY_ACTIVE else EMPTY_NEW) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 126 | |
| 127 | @Volatile |
Roman Elizarov | 5d18d02 | 2018-09-22 22:13:05 +0300 | [diff] [blame] | 128 | private var parentHandle: ChildHandle? = null |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 129 | |
| 130 | // ------------ initialization ------------ |
| 131 | |
| 132 | /** |
| 133 | * Initializes parent job. |
| 134 | * It shall be invoked at most once after construction after all other initialization. |
| 135 | * @suppress **This is unstable API and it is subject to change.** |
| 136 | */ |
| 137 | internal fun initParentJobInternal(parent: Job?) { |
| 138 | check(parentHandle == null) |
| 139 | if (parent == null) { |
| 140 | parentHandle = NonDisposableHandle |
| 141 | return |
| 142 | } |
| 143 | parent.start() // make sure the parent is started |
| 144 | @Suppress("DEPRECATION") |
| 145 | val handle = parent.attachChild(this) |
| 146 | parentHandle = handle |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 147 | // now check our state _after_ registering (see tryFinalizeSimpleState order of actions) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 148 | if (isCompleted) { |
| 149 | handle.dispose() |
| 150 | parentHandle = NonDisposableHandle // release it just in case, to aid GC |
| 151 | } |
| 152 | } |
| 153 | |
| 154 | // ------------ state query ------------ |
| 155 | |
| 156 | /** |
| 157 | * Returns current state of this job. |
| 158 | * @suppress **This is unstable API and it is subject to change.** |
| 159 | */ |
| 160 | internal val state: Any? get() { |
| 161 | _state.loop { state -> // helper loop on state (complete in-progress atomic operations) |
| 162 | if (state !is OpDescriptor) return state |
| 163 | state.perform(this) |
| 164 | } |
| 165 | } |
| 166 | |
| 167 | /** |
| 168 | * @suppress **This is unstable API and it is subject to change.** |
| 169 | */ |
Vsevolod Tolstopyatov | f6430f4 | 2018-04-17 17:56:32 +0300 | [diff] [blame] | 170 | private inline fun loopOnState(block: (Any?) -> Unit): Nothing { |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 171 | while (true) { |
| 172 | block(state) |
| 173 | } |
| 174 | } |
| 175 | |
Vsevolod Tolstopyatov | 79414ec | 2018-08-30 16:50:56 +0300 | [diff] [blame] | 176 | public override val isActive: Boolean get() { |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 177 | val state = this.state |
| 178 | return state is Incomplete && state.isActive |
| 179 | } |
| 180 | |
| 181 | public final override val isCompleted: Boolean get() = state !is Incomplete |
| 182 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 183 | public final override val isFailed: Boolean get() { |
| 184 | val state = this.state |
| 185 | return state is CompletedExceptionally || (state is Finishing && state.isFailing) |
| 186 | } |
| 187 | |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 188 | public final override val isCancelled: Boolean get() { |
| 189 | val state = this.state |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 190 | return state is Cancelled || (state is Finishing && state.isCancelling) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 191 | } |
| 192 | |
| 193 | // ------------ state update ------------ |
| 194 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 195 | // Finalizes Finishing -> Completed (terminal state) transition. |
| 196 | // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method. |
| 197 | private fun tryFinalizeFinishingState(state: Finishing, proposedUpdate: Any?, mode: Int): Boolean { |
| 198 | require(proposedUpdate !is Incomplete) // only incomplete -> completed transition is allowed |
| 199 | require(this.state === state) // consistency check -- it cannot change |
| 200 | require(!state.isSealed) // consistency check -- cannot be sealed yet |
| 201 | require(state.isCompleting) // consistency check -- must be marked as completing |
| 202 | val proposedException = (proposedUpdate as? CompletedExceptionally)?.cause |
| 203 | val proposedCancel = proposedUpdate is Cancelled |
| 204 | // Create the final exception and seal the state so that no more exceptions can be added |
| 205 | var suppressed = false |
| 206 | val finalException = synchronized(state) { |
| 207 | val exceptions = state.sealLocked(proposedException) |
| 208 | val rootCause = getFinalRootCause(state, exceptions) |
| 209 | if (rootCause != null) suppressed = suppressExceptions(rootCause, exceptions) |
| 210 | rootCause |
Vsevolod Tolstopyatov | c9afb67 | 2018-07-24 20:30:48 +0300 | [diff] [blame] | 211 | } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 212 | // Create the final state object |
| 213 | val finalState = when { |
| 214 | // if we have not failed -> use proposed update value |
| 215 | finalException == null -> proposedUpdate |
| 216 | // small optimization when we can used proposeUpdate object as is on failure |
| 217 | finalException === proposedException && proposedCancel == state.isCancelling -> proposedUpdate |
| 218 | // cancelled job final state |
| 219 | state.isCancelling -> Cancelled(finalException) |
| 220 | // failed job final state |
| 221 | else -> CompletedExceptionally(finalException) |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 222 | } |
Vsevolod Tolstopyatov | f157cec | 2018-09-24 17:22:06 +0300 | [diff] [blame] | 223 | |
| 224 | // Now handle exception if parent can't handle it |
| 225 | if (finalException != null && !failParent(finalException)) { |
| 226 | handleJobException(finalException) |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 227 | } |
| 228 | // Then CAS to completed state -> it must succeed |
| 229 | require(_state.compareAndSet(state, finalState)) { "Unexpected state: ${_state.value}, expected: $state, update: $finalState" } |
| 230 | // And process all post-completion actions |
| 231 | completeStateFinalization(state, finalState, mode, suppressed) |
| 232 | return true |
Vsevolod Tolstopyatov | 3bda22c | 2018-07-20 16:14:49 +0300 | [diff] [blame] | 233 | } |
| 234 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 235 | private fun getFinalRootCause(state: Finishing, exceptions: List<Throwable>): Throwable? { |
| 236 | // A case of no exceptions |
| 237 | if (exceptions.isEmpty()) { |
| 238 | // materialize cancellation exception if it was not materialized yet |
| 239 | if (state.isCancelling) return createJobCancellationException() |
| 240 | return null |
| 241 | } |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 242 | /* |
| 243 | * This is a place where we step on our API limitation: |
| 244 | * We can't distinguish internal JobCancellationException from our parent |
| 245 | * from external cancellation, thus we ought to collect all exceptions. |
| 246 | * |
| 247 | * But it has negative consequences: same exception can be added as suppressed more than once. |
| 248 | * Consider concurrent parent-child relationship: |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 249 | * 1) Child throws E1 and parent throws E2. |
| 250 | * 2) Parent goes to "Failing(E1)" and cancels child with E1 |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 251 | * 3) Child goes to "Cancelling(E1)", but throws an exception E2 |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 252 | * 4) When child throws, it notifies parent that it is failing, adding its exception to parent's list of exceptions/ |
| 253 | * 5) Child builds final exception: E1 with suppressed E2, reports it to parent. |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 254 | * 6) Parent aggregates three exceptions: original E1, reported E2 and "final" E1. |
| 255 | * It filters the third exception, but adds the second one to the first one, thus adding suppressed duplicate. |
| 256 | * |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 257 | * Note that it's only happening when both parent and child throw exception simultaneously. |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 258 | */ |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 259 | var rootCause = exceptions[0] |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 260 | if (rootCause is JobCancellationException) { |
| 261 | val cause = unwrap(rootCause) |
| 262 | rootCause = if (cause !== null) { |
| 263 | cause |
| 264 | } else { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 265 | exceptions.firstOrNull { unwrap(it) != null } ?: return rootCause |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 266 | } |
| 267 | } |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 268 | return rootCause |
| 269 | } |
| 270 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 271 | private fun suppressExceptions(rootCause: Throwable, exceptions: List<Throwable>): Boolean { |
| 272 | if (exceptions.size <= 1) return false // nothing more to do here |
Vsevolod Tolstopyatov | f157cec | 2018-09-24 17:22:06 +0300 | [diff] [blame] | 273 | val seenExceptions = identitySet<Throwable>(exceptions.size) |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 274 | var suppressed = false |
| 275 | for (i in 1 until exceptions.size) { |
| 276 | val unwrapped = unwrap(exceptions[i]) |
| 277 | if (unwrapped !== null && unwrapped !== rootCause) { |
| 278 | if (seenExceptions.add(unwrapped)) { |
| 279 | rootCause.addSuppressedThrowable(unwrapped) |
| 280 | suppressed = true |
| 281 | } |
| 282 | } |
| 283 | } |
| 284 | return suppressed |
| 285 | } |
| 286 | |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 287 | private tailrec fun unwrap(exception: Throwable): Throwable? = |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 288 | if (exception is JobCancellationException) { |
| 289 | val cause = exception.cause |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 290 | if (cause !== null) unwrap(cause) else null |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 291 | } else { |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 292 | exception |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 293 | } |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 294 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 295 | // fast-path method to finalize normally completed coroutines without children |
| 296 | private fun tryFinalizeSimpleState(state: Incomplete, update: Any?, mode: Int): Boolean { |
Roman Elizarov | 7e23875 | 2018-09-20 15:05:41 +0300 | [diff] [blame] | 297 | check(state is Empty || state is JobNode<*>) // only simple state without lists where children can concurrently add |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 298 | check(update !is CompletedExceptionally) // only for normal completion |
| 299 | if (!_state.compareAndSet(state, update)) return false |
| 300 | completeStateFinalization(state, update, mode, false) |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 301 | return true |
| 302 | } |
| 303 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 304 | // suppressed == true when any exceptions were suppressed while building the final completion cause |
| 305 | private fun completeStateFinalization(state: Incomplete, update: Any?, mode: Int, suppressed: Boolean) { |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 306 | /* |
| 307 | * Now the job in THE FINAL state. We need to properly handle the resulting state. |
| 308 | * Order of various invocations here is important. |
| 309 | * |
Vsevolod Tolstopyatov | c758169 | 2018-08-13 17:18:01 +0300 | [diff] [blame] | 310 | * 1) Unregister from parent job. |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 311 | */ |
Vsevolod Tolstopyatov | 3bda22c | 2018-07-20 16:14:49 +0300 | [diff] [blame] | 312 | parentHandle?.let { |
| 313 | it.dispose() // volatile read parentHandle _after_ state was updated |
| 314 | parentHandle = NonDisposableHandle // release it just in case, to aid GC |
| 315 | } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 316 | val cause = (update as? CompletedExceptionally)?.cause |
Vsevolod Tolstopyatov | 3bda22c | 2018-07-20 16:14:49 +0300 | [diff] [blame] | 317 | /* |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 318 | * 2) Invoke onFailing: for resource cancellation resource cancellation etc. |
| 319 | * Only notify is was not notified yet. |
| 320 | * Note: we do not use notifyFailing here, since we are going to invoke all completion as our next step |
Vsevolod Tolstopyatov | 3bda22c | 2018-07-20 16:14:49 +0300 | [diff] [blame] | 321 | */ |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 322 | if (!state.isFailing) onFailing(cause) |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 323 | /* |
Vsevolod Tolstopyatov | c758169 | 2018-08-13 17:18:01 +0300 | [diff] [blame] | 324 | * 3) Invoke completion handlers: .join(), callbacks etc. |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 325 | * It's important to invoke them only AFTER exception handling, see #208 |
| 326 | */ |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 327 | if (state is JobNode<*>) { // SINGLE/SINGLE+ state -- one completion handler (common case) |
Vsevolod Tolstopyatov | 3bda22c | 2018-07-20 16:14:49 +0300 | [diff] [blame] | 328 | try { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 329 | state.invoke(cause) |
Vsevolod Tolstopyatov | 3bda22c | 2018-07-20 16:14:49 +0300 | [diff] [blame] | 330 | } catch (ex: Throwable) { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 331 | handleOnCompletionException(CompletionHandlerException("Exception in completion handler $state for $this", ex)) |
Vsevolod Tolstopyatov | 3bda22c | 2018-07-20 16:14:49 +0300 | [diff] [blame] | 332 | } |
| 333 | } else { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 334 | state.list?.notifyCompletion(cause) |
Vsevolod Tolstopyatov | 3bda22c | 2018-07-20 16:14:49 +0300 | [diff] [blame] | 335 | } |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 336 | /* |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 337 | * 4) Invoke onCompletionInternal: onNext(), timeout de-registration etc. |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 338 | * It should be last so all callbacks observe consistent state |
| 339 | * of the job which doesn't depend on callback scheduling. |
| 340 | */ |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 341 | onCompletionInternal(update, mode, suppressed) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 342 | } |
| 343 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 344 | private fun notifyFailing(list: NodeList, cause: Throwable) { |
| 345 | // first cancel our own children |
| 346 | onFailing(cause) |
| 347 | notifyHandlers<JobFailingNode<*>>(list, cause) |
| 348 | // then report to the parent that we are failing |
| 349 | failParent(cause) // tentative failure report -- does not matter if there is no parent |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 350 | } |
| 351 | |
Vsevolod Tolstopyatov | 3bda22c | 2018-07-20 16:14:49 +0300 | [diff] [blame] | 352 | private fun NodeList.notifyCompletion(cause: Throwable?) = |
| 353 | notifyHandlers<JobNode<*>>(this, cause) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 354 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 355 | private inline fun <reified T: JobNode<*>> notifyHandlers(list: NodeList, cause: Throwable?) { |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 356 | var exception: Throwable? = null |
| 357 | list.forEach<T> { node -> |
| 358 | try { |
| 359 | node.invoke(cause) |
| 360 | } catch (ex: Throwable) { |
| 361 | exception?.apply { addSuppressedThrowable(ex) } ?: run { |
| 362 | exception = CompletionHandlerException("Exception in completion handler $node for $this", ex) |
| 363 | } |
| 364 | } |
| 365 | } |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 366 | exception?.let { handleOnCompletionException(it) } |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 367 | } |
| 368 | |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 369 | public final override fun start(): Boolean { |
| 370 | loopOnState { state -> |
| 371 | when (startInternal(state)) { |
| 372 | FALSE -> return false |
| 373 | TRUE -> return true |
| 374 | } |
| 375 | } |
| 376 | } |
| 377 | |
| 378 | // returns: RETRY/FALSE/TRUE: |
| 379 | // FALSE when not new, |
| 380 | // TRUE when started |
| 381 | // RETRY when need to retry |
| 382 | private fun startInternal(state: Any?): Int { |
| 383 | when (state) { |
| 384 | is Empty -> { // EMPTY_X state -- no completion handlers |
| 385 | if (state.isActive) return FALSE // already active |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 386 | if (!_state.compareAndSet(state, EMPTY_ACTIVE)) return RETRY |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 387 | onStartInternal() |
| 388 | return TRUE |
| 389 | } |
Roman Elizarov | ede2923 | 2018-09-18 12:53:09 +0300 | [diff] [blame] | 390 | is InactiveNodeList -> { // LIST state -- inactive with a list of completion handlers |
| 391 | if (!_state.compareAndSet(state, state.list)) return RETRY |
| 392 | onStartInternal() |
| 393 | return TRUE |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 394 | } |
| 395 | else -> return FALSE // not a new state |
| 396 | } |
| 397 | } |
| 398 | |
| 399 | /** |
| 400 | * Override to provide the actual [start] action. |
| 401 | * This function is invoked exactly once when non-active coroutine is [started][start]. |
| 402 | */ |
| 403 | internal open fun onStartInternal() {} |
| 404 | |
| 405 | public final override fun getCancellationException(): CancellationException { |
| 406 | val state = this.state |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 407 | return when (state) { |
| 408 | is Finishing -> state.rootCause?.toCancellationException("Job is failing") |
| 409 | ?: error("Job is still new or active: $this") |
| 410 | is Incomplete -> error("Job is still new or active: $this") |
| 411 | is CompletedExceptionally -> state.cause.toCancellationException("Job has failed") |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 412 | else -> JobCancellationException("Job has completed normally", null, this) |
| 413 | } |
| 414 | } |
| 415 | |
| 416 | private fun Throwable.toCancellationException(message: String): CancellationException = |
| 417 | this as? CancellationException ?: JobCancellationException(message, this, this@JobSupport) |
| 418 | |
| 419 | /** |
| 420 | * Returns the cause that signals the completion of this job -- it returns the original |
Roman Elizarov | 6d9f40f | 2018-04-28 14:44:02 +0300 | [diff] [blame] | 421 | * [cancel] cause, [JobCancellationException] or **`null` if this job had completed normally**. |
| 422 | * This function throws [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 423 | * failing yet. |
| 424 | * |
| 425 | * @suppress **This is unstable API and it is subject to change.** |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 426 | */ |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 427 | protected fun getCompletionCause(): Throwable? = loopOnState { state -> |
| 428 | return when (state) { |
| 429 | is Finishing -> state.rootCause |
| 430 | ?: error("Job is still new or active: $this") |
| 431 | is Incomplete -> error("Job is still new or active: $this") |
| 432 | is CompletedExceptionally -> state.cause |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 433 | else -> null |
| 434 | } |
| 435 | } |
| 436 | |
| 437 | @Suppress("OverridingDeprecatedMember") |
| 438 | public final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle = |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 439 | invokeOnCompletion(onFailing = false, invokeImmediately = true, handler = handler) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 440 | |
| 441 | @Suppress("OverridingDeprecatedMember") |
| 442 | public final override fun invokeOnCompletion(handler: CompletionHandler, onCancelling: Boolean): DisposableHandle = |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 443 | invokeOnCompletion(onFailing = onCancelling, invokeImmediately = true, handler = handler) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 444 | |
| 445 | @Suppress("OverridingDeprecatedMember") |
| 446 | public final override fun invokeOnCompletion(onCancelling_: Boolean, handler: CompletionHandler): DisposableHandle = |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 447 | invokeOnCompletion(onFailing = onCancelling_, invokeImmediately = true, handler = handler) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 448 | |
| 449 | // todo: non-final as a workaround for KT-21968, should be final in the future |
| 450 | public override fun invokeOnCompletion( |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 451 | onFailing: Boolean, |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 452 | invokeImmediately: Boolean, |
| 453 | handler: CompletionHandler |
| 454 | ): DisposableHandle { |
| 455 | var nodeCache: JobNode<*>? = null |
| 456 | loopOnState { state -> |
| 457 | when (state) { |
| 458 | is Empty -> { // EMPTY_X state -- no completion handlers |
| 459 | if (state.isActive) { |
| 460 | // try move to SINGLE state |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 461 | val node = nodeCache ?: makeNode(handler, onFailing).also { nodeCache = it } |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 462 | if (_state.compareAndSet(state, node)) return node |
| 463 | } else |
| 464 | promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine |
| 465 | } |
| 466 | is Incomplete -> { |
| 467 | val list = state.list |
| 468 | if (list == null) { // SINGLE/SINGLE+ |
| 469 | promoteSingleToNodeList(state as JobNode<*>) |
| 470 | } else { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 471 | var rootCause: Throwable? = null |
| 472 | var handle: DisposableHandle = NonDisposableHandle |
| 473 | if (onFailing && state is Finishing) { |
| 474 | synchronized(state) { |
| 475 | // check if we are installing failing handler on job that is failing |
| 476 | rootCause = state.rootCause // != null if we are failing |
| 477 | // We add node to the list in two cases --- either the job is not failing |
| 478 | // or we are adding a child to a coroutine that is not completing yet |
| 479 | if (rootCause == null || handler.isHandlerOf<ChildJob>() && !state.isCompleting) { |
| 480 | // Note: add node the list while holding lock on state (make sure it cannot change) |
| 481 | val node = nodeCache ?: makeNode(handler, onFailing).also { nodeCache = it } |
| 482 | if (!addLastAtomic(state, list, node)) return@loopOnState // retry |
| 483 | // just return node if we don't have to invoke handler (not failing yet) |
| 484 | if (rootCause == null) return node |
| 485 | // otherwise handler is invoked immediately out of the synchronized section & handle returned |
| 486 | handle = node |
| 487 | } |
| 488 | } |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 489 | } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 490 | if (rootCause != null) { |
| 491 | // Note: attachChild uses invokeImmediately, so it gets invoked when adding to failing job |
| 492 | if (invokeImmediately) handler.invokeIt(rootCause) |
| 493 | return handle |
| 494 | } else { |
| 495 | val node = nodeCache ?: makeNode(handler, onFailing).also { nodeCache = it } |
| 496 | if (addLastAtomic(state, list, node)) return node |
| 497 | } |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 498 | } |
| 499 | } |
| 500 | else -> { // is complete |
| 501 | // :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension, |
| 502 | // because we play type tricks on Kotlin/JS and handler is not necessarily a function there |
| 503 | if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause) |
| 504 | return NonDisposableHandle |
| 505 | } |
| 506 | } |
| 507 | } |
| 508 | } |
| 509 | |
| 510 | private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode<*> { |
Vsevolod Tolstopyatov | f6430f4 | 2018-04-17 17:56:32 +0300 | [diff] [blame] | 511 | return if (onCancelling) |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 512 | (handler as? JobFailingNode<*>)?.also { require(it.job === this) } |
| 513 | ?: InvokeOnFailing(this, handler) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 514 | else |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 515 | (handler as? JobNode<*>)?.also { require(it.job === this && it !is JobFailingNode) } |
Roman Elizarov | 6d9f40f | 2018-04-28 14:44:02 +0300 | [diff] [blame] | 516 | ?: InvokeOnCompletion(this, handler) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 517 | } |
| 518 | |
| 519 | private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode<*>) = |
| 520 | list.addLastIf(node) { this.state === expect } |
| 521 | |
| 522 | private fun promoteEmptyToNodeList(state: Empty) { |
Roman Elizarov | ede2923 | 2018-09-18 12:53:09 +0300 | [diff] [blame] | 523 | // try to promote it to LIST state with the corresponding state |
| 524 | val list = NodeList() |
| 525 | val update = if (state.isActive) list else InactiveNodeList(list) |
| 526 | _state.compareAndSet(state, update) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 527 | } |
| 528 | |
| 529 | private fun promoteSingleToNodeList(state: JobNode<*>) { |
| 530 | // try to promote it to list (SINGLE+ state) |
Roman Elizarov | ede2923 | 2018-09-18 12:53:09 +0300 | [diff] [blame] | 531 | state.addOneIfEmpty(NodeList()) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 532 | // it must be in SINGLE+ state or state has changed (node could have need removed from state) |
| 533 | val list = state.nextNode // either our NodeList or somebody else won the race, updated state |
| 534 | // just attempt converting it to list if state is still the same, then we'll continue lock-free loop |
| 535 | _state.compareAndSet(state, list) |
| 536 | } |
| 537 | |
| 538 | public final override suspend fun join() { |
| 539 | if (!joinInternal()) { // fast-path no wait |
Roman Elizarov | 222f3f2 | 2018-07-13 18:47:17 +0300 | [diff] [blame] | 540 | coroutineContext.checkCompletion() |
| 541 | return // do not suspend |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 542 | } |
| 543 | return joinSuspend() // slow-path wait |
| 544 | } |
| 545 | |
| 546 | private fun joinInternal(): Boolean { |
| 547 | loopOnState { state -> |
| 548 | if (state !is Incomplete) return false // not active anymore (complete) -- no need to wait |
| 549 | if (startInternal(state) >= 0) return true // wait unless need to retry |
| 550 | } |
| 551 | } |
| 552 | |
| 553 | private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont -> |
Vsevolod Tolstopyatov | f3a5013 | 2018-04-16 19:41:20 +0300 | [diff] [blame] | 554 | // We have to invoke join() handler only on cancellation, on completion we will be resumed regularly without handlers |
| 555 | cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(this, cont).asHandler)) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 556 | } |
| 557 | |
| 558 | public final override val onJoin: SelectClause0 |
| 559 | get() = this |
| 560 | |
| 561 | // registerSelectJoin |
| 562 | public final override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) { |
| 563 | // fast-path -- check state and select/return if needed |
| 564 | loopOnState { state -> |
| 565 | if (select.isSelected) return |
| 566 | if (state !is Incomplete) { |
| 567 | // already complete -- select result |
| 568 | if (select.trySelect(null)) { |
| 569 | select.completion.context.checkCompletion() // always check for our completion |
Roman Elizarov | 7587eba | 2018-07-25 12:22:46 +0300 | [diff] [blame] | 570 | block.startCoroutineUnintercepted(select.completion) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 571 | } |
| 572 | return |
| 573 | } |
| 574 | if (startInternal(state) == 0) { |
| 575 | // slow-path -- register waiter for completion |
| 576 | select.disposeOnSelect(invokeOnCompletion(handler = SelectJoinOnCompletion(this, select, block).asHandler)) |
| 577 | return |
| 578 | } |
| 579 | } |
| 580 | } |
| 581 | |
| 582 | /** |
| 583 | * @suppress **This is unstable API and it is subject to change.** |
| 584 | */ |
| 585 | internal fun removeNode(node: JobNode<*>) { |
| 586 | // remove logic depends on the state of the job |
| 587 | loopOnState { state -> |
| 588 | when (state) { |
| 589 | is JobNode<*> -> { // SINGE/SINGLE+ state -- one completion handler |
| 590 | if (state !== node) return // a different job node --> we were already removed |
| 591 | // try remove and revert back to empty state |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 592 | if (_state.compareAndSet(state, EMPTY_ACTIVE)) return |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 593 | } |
| 594 | is Incomplete -> { // may have a list of completion handlers |
| 595 | // remove node from the list if there is a list |
| 596 | if (state.list != null) node.remove() |
| 597 | return |
| 598 | } |
| 599 | else -> return // it is complete and does not have any completion handlers |
| 600 | } |
| 601 | } |
| 602 | } |
| 603 | |
| 604 | /** |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 605 | * Returns `true` for job that do not have "body block" to complete and should immediately go into |
| 606 | * completing state and start waiting for children. |
| 607 | * |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 608 | * @suppress **This is unstable API and it is subject to change.** |
| 609 | */ |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 610 | internal open val onFailComplete: Boolean get() = false |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 611 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 612 | // external cancel with (optional) cause |
| 613 | public override fun cancel(cause: Throwable?): Boolean = |
| 614 | fail(cause, cancel = true) && handlesException |
| 615 | |
| 616 | // child is reporting failure to the parent |
Roman Elizarov | 5d18d02 | 2018-09-22 22:13:05 +0300 | [diff] [blame] | 617 | internal fun childFailed(cause: Throwable) = |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 618 | fail(cause, cancel = false) && handlesException |
| 619 | |
| 620 | // parent is cancelling child |
| 621 | public override fun cancelChild(parentJob: Job) { |
| 622 | fail(parentJob, cancel = true) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 623 | } |
| 624 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 625 | // cause is Throwable or Job when cancelChild was invoked, cause can be null only on cancel |
| 626 | // returns true is exception was handled, false otherwise |
| 627 | private fun fail(cause: Any?, cancel: Boolean): Boolean { |
| 628 | if (onFailComplete) { |
| 629 | // make sure it is completing, if failMakeCompleting returns true it means it had make it |
| 630 | // completing and had recorded exception |
| 631 | if (failMakeCompleting(cause, cancel)) return true |
| 632 | // otherwise just record failure via makeFailing below |
| 633 | } |
| 634 | return makeFailing(cause, cancel) |
| 635 | } |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 636 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 637 | private fun failMakeCompleting(cause: Any?, cancel: Boolean): Boolean { |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 638 | loopOnState { state -> |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 639 | if (state !is Incomplete || state is Finishing && state.isCompleting) { |
| 640 | return false // already completed/completing, do not even propose update |
| 641 | } |
| 642 | val proposedUpdate = createFailure(createCauseException(cause), cancel) |
| 643 | when (tryMakeCompleting(state, proposedUpdate, mode = MODE_ATOMIC_DEFAULT)) { |
| 644 | COMPLETING_ALREADY_COMPLETING -> return false |
| 645 | COMPLETING_COMPLETED, COMPLETING_WAITING_CHILDREN -> return true |
| 646 | COMPLETING_RETRY -> return@loopOnState |
| 647 | else -> error("unexpected result") |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 648 | } |
| 649 | } |
| 650 | } |
| 651 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 652 | private fun createJobCancellationException() = |
| 653 | JobCancellationException("Job was cancelled", null, this) |
| 654 | |
| 655 | // cause is Throwable or Job when cancelChild was invoked, cause can be null only on cancel |
| 656 | private fun createCauseException(cause: Any?): Throwable = when(cause) { |
| 657 | is Throwable? -> cause ?: createJobCancellationException() |
| 658 | else -> (cause as Job).getCancellationException() |
| 659 | } |
| 660 | |
| 661 | private fun createFailure(causeException: Throwable, cancel: Boolean): CompletedExceptionally = |
| 662 | when { |
| 663 | cancel -> Cancelled(causeException) |
| 664 | else -> CompletedExceptionally(causeException) |
| 665 | } |
| 666 | |
| 667 | // transitions to Failing state |
| 668 | // cause is Throwable or Job when cancelChild was invoked, cause can be null only on cancel |
| 669 | private fun makeFailing(cause: Any?, cancel: Boolean): Boolean { |
| 670 | var causeExceptionCache: Throwable? = null // lazily init result of createCauseException(cause) |
| 671 | loopOnState { state -> |
| 672 | when (state) { |
| 673 | is Finishing -> { // already finishing -- collect exceptions |
Vsevolod Tolstopyatov | f157cec | 2018-09-24 17:22:06 +0300 | [diff] [blame] | 674 | val notifyRootCause = synchronized(state) { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 675 | if (state.isSealed) return false // too late, already sealed -- cannot add exception nor mark cancelled |
| 676 | // add exception, do nothing is parent is cancelling child that is already failing |
| 677 | val wasFailing = state.isFailing // will notify if was not failing |
| 678 | // Materialize missing exception if it is the first exception (otherwise -- don't) |
| 679 | if (cause != null || !wasFailing) { |
| 680 | val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it } |
| 681 | state.addExceptionLocked(causeException) |
| 682 | } |
| 683 | // mark as cancelling if cancel was requested |
| 684 | if (cancel) state.isCancelling = true |
| 685 | // take cause for notification is was not failing before |
Vsevolod Tolstopyatov | f157cec | 2018-09-24 17:22:06 +0300 | [diff] [blame] | 686 | state.rootCause.takeIf { !wasFailing } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 687 | } |
| 688 | notifyRootCause?.let { notifyFailing(state.list, it) } |
| 689 | return true |
| 690 | } |
| 691 | is Incomplete -> { |
| 692 | // Not yet finishing -- try to make it failing |
| 693 | val list = tryPromoteToList(state) ?: return@loopOnState |
| 694 | val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it } |
| 695 | if (state.isActive) { |
| 696 | // active state becomes failing |
| 697 | if (tryMakeFailing(state, list, causeException, cancel)) return true |
| 698 | } else { |
| 699 | // non active state starts completing |
| 700 | when (tryMakeCompleting(state, createFailure(causeException, cancel), mode = MODE_ATOMIC_DEFAULT)) { |
| 701 | COMPLETING_ALREADY_COMPLETING -> error("Cannot happen in $state") |
| 702 | COMPLETING_COMPLETED, COMPLETING_WAITING_CHILDREN -> return true // ok |
| 703 | COMPLETING_RETRY -> return@loopOnState |
| 704 | else -> error("unexpected result") |
| 705 | } |
| 706 | } |
| 707 | } |
| 708 | else -> return false // already complete |
| 709 | } |
| 710 | } |
| 711 | } |
| 712 | |
| 713 | // Performs promotion of incomplete coroutine state to NodeList, returns null when need to retry |
| 714 | private fun tryPromoteToList(state: Incomplete): NodeList? = state.list ?: null.also { |
| 715 | when (state) { |
| 716 | is Empty -> promoteEmptyToNodeList(state) |
| 717 | is JobNode<*> -> promoteSingleToNodeList(state) |
| 718 | else -> error("State should have list: $state") |
| 719 | } |
| 720 | } |
| 721 | |
| 722 | // try make new failing state on the condition that we're still in the expected state |
| 723 | private fun tryMakeFailing(state: Incomplete, list: NodeList, rootCause: Throwable, cancel: Boolean): Boolean { |
| 724 | check(state !is Finishing) // only for non-finishing states |
| 725 | check(state.isActive) // only for active states |
| 726 | // Create failing state (with rootCause!) |
| 727 | val failing = Finishing(list, cancel, false, rootCause) |
| 728 | if (!_state.compareAndSet(state, failing)) return false |
| 729 | // Notify listeners |
| 730 | notifyFailing(list, rootCause) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 731 | return true |
| 732 | } |
| 733 | |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 734 | /** |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 735 | * This function is used by [CompletableDeferred.complete] (and exceptionally) and by [JobImpl.cancel]. |
| 736 | * It returns `false` on repeated invocation (when this job is already completing). |
| 737 | * |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 738 | * @suppress **This is unstable API and it is subject to change.** |
| 739 | */ |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 740 | internal fun makeCompleting(proposedUpdate: Any?): Boolean = loopOnState { state -> |
| 741 | when (tryMakeCompleting(state, proposedUpdate, mode = MODE_ATOMIC_DEFAULT)) { |
| 742 | COMPLETING_ALREADY_COMPLETING -> return false |
| 743 | COMPLETING_COMPLETED, COMPLETING_WAITING_CHILDREN -> return true |
| 744 | COMPLETING_RETRY -> return@loopOnState |
| 745 | else -> error("unexpected result") |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 746 | } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 747 | } |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 748 | |
| 749 | /** |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 750 | * This function is used by [AbstractCoroutine.resume]. |
| 751 | * It throws exception on repeated invocation (when this job is already completing). |
| 752 | * |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 753 | * Returns: |
| 754 | * * `true` if state was updated to completed/cancelled; |
| 755 | * * `false` if made completing or it is cancelling and is waiting for children. |
| 756 | * |
| 757 | * @throws IllegalStateException if job is already complete or completing |
| 758 | * @suppress **This is unstable API and it is subject to change.** |
| 759 | */ |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 760 | internal fun makeCompletingOnce(proposedUpdate: Any?, mode: Int): Boolean = loopOnState { state -> |
| 761 | when (tryMakeCompleting(state, proposedUpdate, mode)) { |
| 762 | COMPLETING_ALREADY_COMPLETING -> throw IllegalStateException("Job $this is already complete or completing, " + |
Roman Elizarov | 6d9f40f | 2018-04-28 14:44:02 +0300 | [diff] [blame] | 763 | "but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull) |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 764 | COMPLETING_COMPLETED -> return true |
| 765 | COMPLETING_WAITING_CHILDREN -> return false |
| 766 | COMPLETING_RETRY -> return@loopOnState |
| 767 | else -> error("unexpected result") |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 768 | } |
| 769 | } |
| 770 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 771 | private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?, mode: Int): Int { |
| 772 | if (state !is Incomplete) |
| 773 | return COMPLETING_ALREADY_COMPLETING |
Vsevolod Tolstopyatov | f157cec | 2018-09-24 17:22:06 +0300 | [diff] [blame] | 774 | /* |
| 775 | * FAST PATH -- no children to wait for && simple state (no list) && not failing => can complete immediately |
| 776 | * Failures always have to go through Finishing state to serialize exception handling. |
| 777 | * Otherwise, there can be a race between (completed state -> handled exception and newly attached child/join) |
| 778 | * which may miss unhandled exception. |
| 779 | */ |
Roman Elizarov | 6634ed7 | 2018-09-20 15:13:06 +0300 | [diff] [blame] | 780 | if ((state is Empty || state is JobNode<*>) && state !is ChildJob && proposedUpdate !is CompletedExceptionally) { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 781 | if (!tryFinalizeSimpleState(state, proposedUpdate, mode)) return COMPLETING_RETRY |
| 782 | return COMPLETING_COMPLETED |
| 783 | } |
| 784 | // get state's list or else promote to list to correctly operate on child lists |
| 785 | val list = tryPromoteToList(state) ?: return COMPLETING_RETRY |
| 786 | // promote to Finishing state if we are not in it yet |
| 787 | // This promotion has to be atomic w.r.t to state change, so that a coroutine that is not active yet |
| 788 | // atomically transition to finishing & completing state |
| 789 | val finishing = state as? Finishing ?: Finishing(list, false, false, null) |
| 790 | // must synchronize updates to finishing state |
| 791 | var notifyRootCause: Throwable? = null |
| 792 | synchronized(finishing) { |
| 793 | // check if this state is already completing |
| 794 | if (finishing.isCompleting) return COMPLETING_ALREADY_COMPLETING |
| 795 | // mark as completing |
| 796 | finishing.isCompleting = true |
| 797 | // ## IMPORTANT INVARIANT: Only one thread (that had set isCompleting) can go past this point |
| 798 | require(!finishing.isSealed) // cannot be sealed |
| 799 | // mark as cancelling is the proposed update is to cancel |
| 800 | if (proposedUpdate is Cancelled) finishing.isCancelling = true |
| 801 | // add new proposed exception to the finishing state |
| 802 | val wasFailing = finishing.isFailing |
| 803 | (proposedUpdate as? CompletedExceptionally)?.let { finishing.addExceptionLocked(it.cause) } |
| 804 | // If it just becomes failing --> must process failing notifications |
| 805 | notifyRootCause = finishing.rootCause.takeIf { !wasFailing } |
| 806 | } |
| 807 | // if we need to promote to finishing then atomically do it here |
| 808 | if (finishing !== state) { |
| 809 | if (!_state.compareAndSet(state, finishing)) return COMPLETING_RETRY |
| 810 | } |
| 811 | // process failing notification here -- it cancels all the children _before_ we start to to wait them (sic!!!) |
| 812 | notifyRootCause?.let { notifyFailing(list, it) } |
| 813 | // now wait for children |
Roman Elizarov | 6634ed7 | 2018-09-20 15:13:06 +0300 | [diff] [blame] | 814 | val child = firstChild(state) |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 815 | if (child != null && tryWaitForChild(finishing, child, proposedUpdate)) |
| 816 | return COMPLETING_WAITING_CHILDREN |
| 817 | // otherwise -- we have not children left (all were already cancelled?) |
| 818 | if (tryFinalizeFinishingState(finishing, proposedUpdate, mode)) |
| 819 | return COMPLETING_COMPLETED |
| 820 | // otherwise retry |
| 821 | return COMPLETING_RETRY |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 822 | } |
| 823 | |
| 824 | private val Any?.exceptionOrNull: Throwable? |
Roman Elizarov | 6d9f40f | 2018-04-28 14:44:02 +0300 | [diff] [blame] | 825 | get() = (this as? CompletedExceptionally)?.cause |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 826 | |
| 827 | private fun firstChild(state: Incomplete) = |
Vsevolod Tolstopyatov | f3a5013 | 2018-04-16 19:41:20 +0300 | [diff] [blame] | 828 | state as? ChildJob ?: state.list?.nextChild() |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 829 | |
| 830 | // return false when there is no more incomplete children to wait |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 831 | // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method. |
| 832 | private tailrec fun tryWaitForChild(state: Finishing, child: ChildJob, proposedUpdate: Any?): Boolean { |
| 833 | val handle = child.childJob.invokeOnCompletion( |
| 834 | invokeImmediately = false, |
| 835 | handler = ChildCompletion(this, state, child, proposedUpdate).asHandler |
| 836 | ) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 837 | if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it |
| 838 | val nextChild = child.nextChild() ?: return false |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 839 | return tryWaitForChild(state, nextChild, proposedUpdate) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 840 | } |
| 841 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 842 | // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method. |
| 843 | private fun continueCompleting(state: Finishing, lastChild: ChildJob, proposedUpdate: Any?) { |
| 844 | require(this.state === state) // consistency check -- it cannot change while we are waiting for children |
| 845 | // figure out if we need to wait for next child |
| 846 | val waitChild = lastChild.nextChild() |
| 847 | // try wait for next child |
| 848 | if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child |
| 849 | // no more children to wait -- try update state |
| 850 | if (tryFinalizeFinishingState(state, proposedUpdate, MODE_ATOMIC_DEFAULT)) return |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 851 | } |
| 852 | |
Vsevolod Tolstopyatov | f3a5013 | 2018-04-16 19:41:20 +0300 | [diff] [blame] | 853 | private fun LockFreeLinkedListNode.nextChild(): ChildJob? { |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 854 | var cur = this |
| 855 | while (cur.isRemoved) cur = cur.prevNode // rollback to prev non-removed (or list head) |
| 856 | while (true) { |
| 857 | cur = cur.nextNode |
| 858 | if (cur.isRemoved) continue |
Vsevolod Tolstopyatov | f3a5013 | 2018-04-16 19:41:20 +0300 | [diff] [blame] | 859 | if (cur is ChildJob) return cur |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 860 | if (cur is NodeList) return null // checked all -- no more children |
| 861 | } |
| 862 | } |
| 863 | |
| 864 | public final override val children: Sequence<Job> get() = buildSequence { |
| 865 | val state = this@JobSupport.state |
| 866 | when (state) { |
Vsevolod Tolstopyatov | f3a5013 | 2018-04-16 19:41:20 +0300 | [diff] [blame] | 867 | is ChildJob -> yield(state.childJob) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 868 | is Incomplete -> state.list?.let { list -> |
Vsevolod Tolstopyatov | f3a5013 | 2018-04-16 19:41:20 +0300 | [diff] [blame] | 869 | list.forEach<ChildJob> { yield(it.childJob) } |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 870 | } |
| 871 | } |
| 872 | } |
| 873 | |
| 874 | @Suppress("OverridingDeprecatedMember") |
Roman Elizarov | 5d18d02 | 2018-09-22 22:13:05 +0300 | [diff] [blame] | 875 | public final override fun attachChild(child: Job): ChildHandle { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 876 | /* |
| 877 | * Note: This function attaches a special ChildNode object. This node object |
| 878 | * is handled in a special way on completion on the coroutine (we wait for all of them) and |
| 879 | * is handled specially by invokeOnCompletion itself -- it adds this node to the list even |
Vsevolod Tolstopyatov | 409ed26 | 2018-09-24 17:48:20 +0300 | [diff] [blame] | 880 | * if the job is already failing. For "failing" state child is attached under state lock. |
| 881 | * It's required to properly wait all children before completion and provide linearizable hierarchy view: |
| 882 | * If child is attached when job is failing, such child will receive immediate cancellation exception, |
| 883 | * but parent *will* wait for that child before completion and will handle its exception. |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 884 | */ |
Roman Elizarov | 5d18d02 | 2018-09-22 22:13:05 +0300 | [diff] [blame] | 885 | return invokeOnCompletion(onFailing = true, handler = ChildJob(this, child).asHandler) as ChildHandle |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 886 | } |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 887 | |
| 888 | @Suppress("OverridingDeprecatedMember") |
| 889 | public final override fun cancelChildren(cause: Throwable?) { |
| 890 | this.cancelChildren(cause) // use extension function |
| 891 | } |
| 892 | |
| 893 | /** |
| 894 | * Override to process any exceptions that were encountered while invoking completion handlers |
| 895 | * installed via [invokeOnCompletion]. |
| 896 | * @suppress **This is unstable API and it is subject to change.** |
| 897 | */ |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 898 | internal open fun handleOnCompletionException(exception: Throwable) { |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 899 | throw exception |
| 900 | } |
| 901 | |
| 902 | /** |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 903 | * This function is invoked once when job is failing or is completed. |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 904 | * It's an optimization for [invokeOnCompletion] with `onCancelling` set to `true`. |
| 905 | * |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 906 | * @suppress **This is unstable API and it is subject to change.* |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 907 | */ |
Roman Elizarov | f13cbae | 2018-09-24 19:48:47 +0300 | [diff] [blame^] | 908 | protected open fun onFailing(cause: Throwable?) {} |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 909 | |
Roman Elizarov | 5d18d02 | 2018-09-22 22:13:05 +0300 | [diff] [blame] | 910 | /** |
| 911 | * When this function returns `true` the parent fails on the failure of this job. |
| 912 | * |
| 913 | * @suppress **This is unstable API and it is subject to change.* |
| 914 | */ |
| 915 | protected open val failsParent: Boolean get() = false |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 916 | |
| 917 | /** |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 918 | * Returns `true` for jobs that handle their exceptions via [handleJobException] or integrate them |
| 919 | * into the job's result via [onCompletionInternal]. The only instance of the [Job] that does not |
| 920 | * handle its exceptions is [JobImpl]. |
| 921 | * |
| 922 | * @suppress **This is unstable API and it is subject to change.* |
Roman Elizarov | 67912f9 | 2018-09-16 01:46:43 +0300 | [diff] [blame] | 923 | */ |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 924 | protected open val handlesException: Boolean get() = true |
Roman Elizarov | 67912f9 | 2018-09-16 01:46:43 +0300 | [diff] [blame] | 925 | |
| 926 | /** |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 927 | * This method is invoked **exactly once** when the final exception of the job is determined |
| 928 | * and before it becomes complete. At the moment of invocation the job and all its children are complete. |
| 929 | * |
| 930 | * @suppress **This is unstable API and it is subject to change.* |
Roman Elizarov | 67912f9 | 2018-09-16 01:46:43 +0300 | [diff] [blame] | 931 | */ |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 932 | protected open fun handleJobException(exception: Throwable) {} |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 933 | |
Roman Elizarov | 5d18d02 | 2018-09-22 22:13:05 +0300 | [diff] [blame] | 934 | private fun failParent(cause: Throwable): Boolean { |
| 935 | if (cause is CancellationException) return true |
| 936 | if (!failsParent) return false |
| 937 | return parentHandle?.childFailed(cause) == true |
| 938 | } |
| 939 | |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 940 | /** |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 941 | * Override for post-completion actions that need to do something with the state. |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 942 | * @param state the final state. |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 943 | * @param mode completion mode. |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 944 | * @param suppressed true when any exceptions were suppressed while building the final completion cause. |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 945 | * @suppress **This is unstable API and it is subject to change.** |
| 946 | */ |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 947 | internal open fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {} |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 948 | |
| 949 | // for nicer debugging |
| 950 | public override fun toString(): String = |
| 951 | "${nameString()}{${stateString()}}@$hexAddress" |
| 952 | |
| 953 | /** |
| 954 | * @suppress **This is unstable API and it is subject to change.** |
| 955 | */ |
| 956 | internal open fun nameString(): String = classSimpleName |
| 957 | |
| 958 | private fun stateString(): String { |
| 959 | val state = this.state |
| 960 | return when (state) { |
| 961 | is Finishing -> buildString { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 962 | when { // cancelling implies failing |
| 963 | state.isCancelling -> append("Cancelling") |
| 964 | state.isFailing -> append("Failing") |
| 965 | else -> append("Active") |
| 966 | } |
| 967 | if (state.isCompleting) append("Completing") |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 968 | } |
| 969 | is Incomplete -> if (state.isActive) "Active" else "New" |
| 970 | is Cancelled -> "Cancelled" |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 971 | is CompletedExceptionally -> "Failed" |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 972 | else -> "Completed" |
| 973 | } |
| 974 | } |
| 975 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 976 | // Completing, Failing, Cancelling states, |
| 977 | // All updates are guarded by synchronized(this), reads are volatile |
Vsevolod Tolstopyatov | c9afb67 | 2018-07-24 20:30:48 +0300 | [diff] [blame] | 978 | @Suppress("UNCHECKED_CAST") |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 979 | private class Finishing( |
| 980 | override val list: NodeList, |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 981 | @Volatile |
| 982 | @JvmField var isCancelling: Boolean, |
| 983 | @Volatile |
| 984 | @JvmField var isCompleting: Boolean, |
| 985 | @Volatile |
| 986 | @JvmField var rootCause: Throwable? // NOTE: rootCause is kept even when SEALED |
Roman Elizarov | 4e33cc6 | 2018-09-05 01:15:44 +0300 | [diff] [blame] | 987 | ) : SynchronizedObject(), Incomplete { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 988 | @Volatile |
| 989 | private var _exceptionsHolder: Any? = null // Contains null | Throwable | ArrayList | SEALED |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 990 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 991 | // NotE: cannot be modified when sealed |
| 992 | val isSealed: Boolean get() = _exceptionsHolder === SEALED |
| 993 | val isFailing: Boolean get() = rootCause != null |
| 994 | override val isActive: Boolean get() = !isFailing |
| 995 | |
| 996 | // Seals current state and returns list of exceptions |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 997 | // guarded by `synchronized(this)` |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 998 | fun sealLocked(proposedException: Throwable?): List<Throwable> { |
| 999 | val eh = _exceptionsHolder // volatile read |
| 1000 | val list = when(eh) { |
| 1001 | null -> allocateList() |
| 1002 | is Throwable -> allocateList().also { it.add(eh) } |
| 1003 | is ArrayList<*> -> eh as ArrayList<Throwable> |
| 1004 | else -> error("State is $eh") // already sealed -- cannot happen |
| 1005 | } |
| 1006 | val rootCause = this.rootCause // volatile read |
| 1007 | rootCause?.let { list.add(0, it) } // note -- rootCause goes to the beginning |
| 1008 | if (proposedException != null && proposedException != rootCause) list.add(proposedException) |
| 1009 | _exceptionsHolder = SEALED |
| 1010 | return list |
Vsevolod Tolstopyatov | c9afb67 | 2018-07-24 20:30:48 +0300 | [diff] [blame] | 1011 | } |
| 1012 | |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 1013 | // guarded by `synchronized(this)` |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1014 | fun addExceptionLocked(exception: Throwable) { |
| 1015 | val rootCause = this.rootCause // volatile read |
| 1016 | if (rootCause == null) { |
| 1017 | this.rootCause = exception |
| 1018 | return |
Vsevolod Tolstopyatov | 06f57aa | 2018-07-24 19:51:21 +0300 | [diff] [blame] | 1019 | } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1020 | if (exception === rootCause) return // nothing to do |
| 1021 | val eh = _exceptionsHolder // volatile read |
| 1022 | when (eh) { |
| 1023 | null -> _exceptionsHolder = exception |
Vsevolod Tolstopyatov | c9afb67 | 2018-07-24 20:30:48 +0300 | [diff] [blame] | 1024 | is Throwable -> { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1025 | if (exception === eh) return // nothing to do |
| 1026 | _exceptionsHolder = allocateList().apply { |
| 1027 | add(eh) |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 1028 | add(exception) |
| 1029 | |
| 1030 | } |
Vsevolod Tolstopyatov | c9afb67 | 2018-07-24 20:30:48 +0300 | [diff] [blame] | 1031 | } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1032 | is ArrayList<*> -> (eh as ArrayList<Throwable>).add(exception) |
| 1033 | else -> error("State is $eh") // already sealed -- cannot happen |
Vsevolod Tolstopyatov | 91ecee8 | 2018-08-07 18:24:00 +0300 | [diff] [blame] | 1034 | } |
| 1035 | } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1036 | |
| 1037 | private fun allocateList() = ArrayList<Throwable>(4) |
| 1038 | |
| 1039 | override fun toString(): String = |
| 1040 | "Finishing[cancelling=$isCancelling, completing=$isCompleting, rootCause=$rootCause, exceptions=$_exceptionsHolder, list=$list]" |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1041 | } |
| 1042 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1043 | private val Incomplete.isFailing: Boolean |
| 1044 | get() = this is Finishing && isFailing |
| 1045 | |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1046 | private val Incomplete.isCancelling: Boolean |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1047 | get() = this is Finishing && isCancelling |
| 1048 | |
| 1049 | // Used by parent that is waiting for child completion |
| 1050 | private class ChildCompletion( |
| 1051 | private val parent: JobSupport, |
| 1052 | private val state: Finishing, |
| 1053 | private val child: ChildJob, |
| 1054 | private val proposedUpdate: Any? |
| 1055 | ) : JobNode<Job>(child.childJob) { |
| 1056 | override fun invoke(cause: Throwable?) { |
| 1057 | parent.continueCompleting(state, child, proposedUpdate) |
| 1058 | } |
| 1059 | override fun toString(): String = |
| 1060 | "ChildCompletion[$child, $proposedUpdate]" |
| 1061 | } |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1062 | |
| 1063 | /* |
| 1064 | * ================================================================================================= |
| 1065 | * This is ready-to-use implementation for Deferred interface. |
| 1066 | * However, it is not type-safe. Conceptually it just exposes the value of the underlying |
| 1067 | * completed state as `Any?` |
| 1068 | * ================================================================================================= |
| 1069 | */ |
| 1070 | |
| 1071 | public val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally |
| 1072 | |
| 1073 | public fun getCompletionExceptionOrNull(): Throwable? { |
| 1074 | val state = this.state |
| 1075 | check(state !is Incomplete) { "This job has not completed yet" } |
| 1076 | return state.exceptionOrNull |
| 1077 | } |
| 1078 | |
| 1079 | /** |
| 1080 | * @suppress **This is unstable API and it is subject to change.** |
| 1081 | */ |
| 1082 | internal fun getCompletedInternal(): Any? { |
| 1083 | val state = this.state |
| 1084 | check(state !is Incomplete) { "This job has not completed yet" } |
Roman Elizarov | 6d9f40f | 2018-04-28 14:44:02 +0300 | [diff] [blame] | 1085 | if (state is CompletedExceptionally) throw state.cause |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1086 | return state |
| 1087 | } |
| 1088 | |
| 1089 | /** |
| 1090 | * @suppress **This is unstable API and it is subject to change.** |
| 1091 | */ |
| 1092 | internal suspend fun awaitInternal(): Any? { |
| 1093 | // fast-path -- check state (avoid extra object creation) |
| 1094 | while(true) { // lock-free loop on state |
| 1095 | val state = this.state |
| 1096 | if (state !is Incomplete) { |
| 1097 | // already complete -- just return result |
Roman Elizarov | 6d9f40f | 2018-04-28 14:44:02 +0300 | [diff] [blame] | 1098 | if (state is CompletedExceptionally) throw state.cause |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1099 | return state |
| 1100 | |
| 1101 | } |
| 1102 | if (startInternal(state) >= 0) break // break unless needs to retry |
| 1103 | } |
| 1104 | return awaitSuspend() // slow-path |
| 1105 | } |
| 1106 | |
| 1107 | private suspend fun awaitSuspend(): Any? = suspendCancellableCoroutine { cont -> |
Vsevolod Tolstopyatov | f6430f4 | 2018-04-17 17:56:32 +0300 | [diff] [blame] | 1108 | // We have to invoke await() handler only on cancellation, on completion we will be resumed regularly without handlers |
Vsevolod Tolstopyatov | f3a5013 | 2018-04-16 19:41:20 +0300 | [diff] [blame] | 1109 | cont.disposeOnCancellation(invokeOnCompletion { |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1110 | val state = this.state |
| 1111 | check(state !is Incomplete) |
| 1112 | if (state is CompletedExceptionally) |
Roman Elizarov | 6d9f40f | 2018-04-28 14:44:02 +0300 | [diff] [blame] | 1113 | cont.resumeWithException(state.cause) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1114 | else |
| 1115 | cont.resume(state) |
| 1116 | }) |
| 1117 | } |
| 1118 | |
| 1119 | /** |
| 1120 | * @suppress **This is unstable API and it is subject to change.** |
| 1121 | */ |
| 1122 | // registerSelectAwaitInternal |
| 1123 | @Suppress("UNCHECKED_CAST") |
| 1124 | internal fun <T, R> registerSelectClause1Internal(select: SelectInstance<R>, block: suspend (T) -> R) { |
| 1125 | // fast-path -- check state and select/return if needed |
| 1126 | loopOnState { state -> |
| 1127 | if (select.isSelected) return |
| 1128 | if (state !is Incomplete) { |
| 1129 | // already complete -- select result |
| 1130 | if (select.trySelect(null)) { |
| 1131 | if (state is CompletedExceptionally) |
Roman Elizarov | 6d9f40f | 2018-04-28 14:44:02 +0300 | [diff] [blame] | 1132 | select.resumeSelectCancellableWithException(state.cause) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1133 | else |
Roman Elizarov | 7587eba | 2018-07-25 12:22:46 +0300 | [diff] [blame] | 1134 | block.startCoroutineUnintercepted(state as T, select.completion) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1135 | } |
| 1136 | return |
| 1137 | } |
| 1138 | if (startInternal(state) == 0) { |
| 1139 | // slow-path -- register waiter for completion |
| 1140 | select.disposeOnSelect(invokeOnCompletion(handler = SelectAwaitOnCompletion(this, select, block).asHandler)) |
| 1141 | return |
| 1142 | } |
| 1143 | } |
| 1144 | } |
| 1145 | |
| 1146 | /** |
| 1147 | * @suppress **This is unstable API and it is subject to change.** |
| 1148 | */ |
| 1149 | @Suppress("UNCHECKED_CAST") |
| 1150 | internal fun <T, R> selectAwaitCompletion(select: SelectInstance<R>, block: suspend (T) -> R) { |
| 1151 | val state = this.state |
| 1152 | // Note: await is non-atomic (can be cancelled while dispatched) |
| 1153 | if (state is CompletedExceptionally) |
Roman Elizarov | 6d9f40f | 2018-04-28 14:44:02 +0300 | [diff] [blame] | 1154 | select.resumeSelectCancellableWithException(state.cause) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1155 | else |
| 1156 | block.startCoroutineCancellable(state as T, select.completion) |
| 1157 | } |
| 1158 | } |
| 1159 | |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 1160 | // --------------- helper classes & constants for job implementation |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1161 | |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1162 | private const val COMPLETING_ALREADY_COMPLETING = 0 |
| 1163 | private const val COMPLETING_COMPLETED = 1 |
| 1164 | private const val COMPLETING_WAITING_CHILDREN = 2 |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1165 | private const val COMPLETING_RETRY = 3 |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1166 | |
| 1167 | private const val RETRY = -1 |
| 1168 | private const val FALSE = 0 |
| 1169 | private const val TRUE = 1 |
| 1170 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1171 | private val SEALED = Symbol("SEALED") |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1172 | |
Roman Elizarov | 563da40 | 2018-08-10 19:18:56 +0300 | [diff] [blame] | 1173 | private val EMPTY_NEW = Empty(false) |
| 1174 | private val EMPTY_ACTIVE = Empty(true) |
Vsevolod Tolstopyatov | c9afb67 | 2018-07-24 20:30:48 +0300 | [diff] [blame] | 1175 | |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1176 | private class Empty(override val isActive: Boolean) : Incomplete { |
| 1177 | override val list: NodeList? get() = null |
| 1178 | override fun toString(): String = "Empty{${if (isActive) "Active" else "New" }}" |
| 1179 | } |
| 1180 | |
| 1181 | internal class JobImpl(parent: Job? = null) : JobSupport(true) { |
| 1182 | init { initParentJobInternal(parent) } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1183 | override val onFailComplete get() = true |
| 1184 | override val handlesException: Boolean get() = false |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1185 | } |
| 1186 | |
| 1187 | // -------- invokeOnCompletion nodes |
| 1188 | |
| 1189 | internal interface Incomplete { |
| 1190 | val isActive: Boolean |
| 1191 | val list: NodeList? // is null only for Empty and JobNode incomplete state objects |
| 1192 | } |
| 1193 | |
Roman Elizarov | 6d9f40f | 2018-04-28 14:44:02 +0300 | [diff] [blame] | 1194 | internal abstract class JobNode<out J : Job>( |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1195 | @JvmField val job: J |
Roman Elizarov | dbd9e1c | 2018-04-28 15:14:18 +0300 | [diff] [blame] | 1196 | ) : CompletionHandlerBase(), DisposableHandle, Incomplete { |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1197 | override val isActive: Boolean get() = true |
| 1198 | override val list: NodeList? get() = null |
| 1199 | override fun dispose() = (job as JobSupport).removeNode(this) |
| 1200 | } |
| 1201 | |
Roman Elizarov | ede2923 | 2018-09-18 12:53:09 +0300 | [diff] [blame] | 1202 | internal class NodeList : LockFreeLinkedListHead(), Incomplete { |
| 1203 | override val isActive: Boolean get() = true |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1204 | override val list: NodeList get() = this |
| 1205 | |
Roman Elizarov | ede2923 | 2018-09-18 12:53:09 +0300 | [diff] [blame] | 1206 | fun getString(state: String) = buildString { |
| 1207 | append("List{") |
| 1208 | append(state) |
| 1209 | append("}[") |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1210 | var first = true |
| 1211 | this@NodeList.forEach<JobNode<*>> { node -> |
| 1212 | if (first) first = false else append(", ") |
| 1213 | append(node) |
| 1214 | } |
| 1215 | append("]") |
| 1216 | } |
Roman Elizarov | ede2923 | 2018-09-18 12:53:09 +0300 | [diff] [blame] | 1217 | |
| 1218 | override fun toString(): String = getString("Active") |
| 1219 | } |
| 1220 | |
| 1221 | internal class InactiveNodeList( |
| 1222 | override val list: NodeList |
| 1223 | ) : Incomplete { |
| 1224 | override val isActive: Boolean get() = false |
| 1225 | override fun toString(): String = list.getString("New") |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1226 | } |
| 1227 | |
| 1228 | private class InvokeOnCompletion( |
| 1229 | job: Job, |
| 1230 | private val handler: CompletionHandler |
| 1231 | ) : JobNode<Job>(job) { |
| 1232 | override fun invoke(cause: Throwable?) = handler.invoke(cause) |
| 1233 | override fun toString() = "InvokeOnCompletion[$classSimpleName@$hexAddress]" |
| 1234 | } |
| 1235 | |
| 1236 | private class ResumeOnCompletion( |
| 1237 | job: Job, |
| 1238 | private val continuation: Continuation<Unit> |
| 1239 | ) : JobNode<Job>(job) { |
| 1240 | override fun invoke(cause: Throwable?) = continuation.resume(Unit) |
| 1241 | override fun toString() = "ResumeOnCompletion[$continuation]" |
| 1242 | } |
| 1243 | |
| 1244 | internal class DisposeOnCompletion( |
| 1245 | job: Job, |
| 1246 | private val handle: DisposableHandle |
| 1247 | ) : JobNode<Job>(job) { |
| 1248 | override fun invoke(cause: Throwable?) = handle.dispose() |
| 1249 | override fun toString(): String = "DisposeOnCompletion[$handle]" |
| 1250 | } |
| 1251 | |
| 1252 | private class SelectJoinOnCompletion<R>( |
| 1253 | job: JobSupport, |
| 1254 | private val select: SelectInstance<R>, |
| 1255 | private val block: suspend () -> R |
| 1256 | ) : JobNode<JobSupport>(job) { |
| 1257 | override fun invoke(cause: Throwable?) { |
| 1258 | if (select.trySelect(null)) |
| 1259 | block.startCoroutineCancellable(select.completion) |
| 1260 | } |
| 1261 | override fun toString(): String = "SelectJoinOnCompletion[$select]" |
| 1262 | } |
| 1263 | |
| 1264 | private class SelectAwaitOnCompletion<T, R>( |
| 1265 | job: JobSupport, |
| 1266 | private val select: SelectInstance<R>, |
| 1267 | private val block: suspend (T) -> R |
| 1268 | ) : JobNode<JobSupport>(job) { |
| 1269 | override fun invoke(cause: Throwable?) { |
| 1270 | if (select.trySelect(null)) |
| 1271 | job.selectAwaitCompletion(select, block) |
| 1272 | } |
| 1273 | override fun toString(): String = "SelectAwaitOnCompletion[$select]" |
| 1274 | } |
| 1275 | |
| 1276 | // -------- invokeOnCancellation nodes |
| 1277 | |
| 1278 | /** |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1279 | * Marker for node that shall be invoked on in _failing_ state. |
| 1280 | * **Note: may be invoked multiple times.** |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1281 | */ |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1282 | internal abstract class JobFailingNode<out J : Job>(job: J) : JobNode<J>(job) |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1283 | |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1284 | private class InvokeOnFailing( |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1285 | job: Job, |
| 1286 | private val handler: CompletionHandler |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1287 | ) : JobFailingNode<Job>(job) { |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1288 | // delegate handler shall be invoked at most once, so here is an additional flag |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1289 | private val _invoked = atomic(0) // todo: replace with atomic boolean after migration to recent atomicFu |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1290 | override fun invoke(cause: Throwable?) { |
| 1291 | if (_invoked.compareAndSet(0, 1)) handler.invoke(cause) |
| 1292 | } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1293 | override fun toString() = "InvokeOnFailing[$classSimpleName@$hexAddress]" |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1294 | } |
| 1295 | |
Vsevolod Tolstopyatov | f3a5013 | 2018-04-16 19:41:20 +0300 | [diff] [blame] | 1296 | internal class ChildJob( |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1297 | parent: JobSupport, |
| 1298 | @JvmField val childJob: Job |
Roman Elizarov | 5d18d02 | 2018-09-22 22:13:05 +0300 | [diff] [blame] | 1299 | ) : JobFailingNode<JobSupport>(parent), ChildHandle { |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1300 | override fun invoke(cause: Throwable?) = childJob.cancelChild(job) |
Roman Elizarov | 5d18d02 | 2018-09-22 22:13:05 +0300 | [diff] [blame] | 1301 | override fun childFailed(cause: Throwable): Boolean = job.childFailed(cause) |
Vsevolod Tolstopyatov | f3a5013 | 2018-04-16 19:41:20 +0300 | [diff] [blame] | 1302 | override fun toString(): String = "ChildJob[$childJob]" |
| 1303 | } |
| 1304 | |
| 1305 | // Same as ChildJob, but for cancellable continuation |
| 1306 | internal class ChildContinuation( |
| 1307 | parent: Job, |
| 1308 | @JvmField val child: AbstractContinuation<*> |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1309 | ) : JobFailingNode<Job>(parent) { |
Vsevolod Tolstopyatov | f3a5013 | 2018-04-16 19:41:20 +0300 | [diff] [blame] | 1310 | override fun invoke(cause: Throwable?) { |
| 1311 | child.cancel(job.getCancellationException()) |
| 1312 | } |
Roman Elizarov | ecbc85c | 2018-09-14 12:52:50 +0300 | [diff] [blame] | 1313 | override fun toString(): String = |
| 1314 | "ChildContinuation[$child]" |
Vsevolod Tolstopyatov | d521478 | 2018-04-13 14:51:30 +0300 | [diff] [blame] | 1315 | } |
| 1316 | |