blob: 9c55e3fc39f14f571fa32162e39f3950474ca191 [file] [log] [blame]
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03001/*
2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03005package kotlinx.coroutines.experimental
6
7import kotlinx.atomicfu.*
8import kotlinx.coroutines.experimental.internal.*
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03009import kotlinx.coroutines.experimental.intrinsics.*
10import kotlinx.coroutines.experimental.selects.*
11import kotlin.coroutines.experimental.*
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +030012
13/**
14 * A concrete implementation of [Job]. It is optionally a child to a parent job.
15 * This job is cancelled when the parent is complete, but not vise-versa.
16 *
17 * This is an open class designed for extension by more specific classes that might augment the
18 * state and mare store addition state information for completed jobs, like their result values.
19 *
20 * @param active when `true` the job is created in _active_ state, when `false` in _new_ state. See [Job] for details.
21 * @suppress **This is unstable API and it is subject to change.**
22 */
23internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0 {
24 final override val key: CoroutineContext.Key<*> get() = Job
25
26 /*
27 === Internal states ===
28
Roman Elizarovecbc85c2018-09-14 12:52:50 +030029 name state class public state description
30 ------ ------------ ------------ -----------
31 EMPTY_N EmptyNew : New no listeners
32 EMPTY_A EmptyActive : Active no listeners
33 SINGLE JobNode : Active a single listener
34 SINGLE+ JobNode : Active a single listener + NodeList added as its next
35 LIST_N InactiveNodeList : New a list of listeners (promoted once, does not got back to EmptyNew)
36 LIST_A NodeList : Active a list of listeners (promoted once, does not got back to JobNode/EmptyActive)
37 COMPLETING Finishing : Completing has a list of listeners (promoted once from LIST_*)
38 CANCELLING Finishing : Cancelling -- " --
39 FINAL_C Cancelled : Cancelled cancelled (final state)
40 FINAL_F CompletedExceptionally : Completed failed for other reason (final state)
41 FINAL_R <any> : Completed produced some result
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +030042
43 === Transitions ===
44
45 New states Active states Inactive states
46
47 +---------+ +---------+ }
Roman Elizarovecbc85c2018-09-14 12:52:50 +030048 | EMPTY_N | ----> | EMPTY_A | ----+ } Empty states
49 +---------+ +---------+ | }
50 | | | ^ | +----------+
51 | | | | +--> | FINAL_* |
52 | | V | | +----------+
53 | | +---------+ | }
54 | | | SINGLE | ----+ } JobNode states
55 | | +---------+ | }
56 | | | | }
57 | | V | }
58 | | +---------+ | }
59 | +-------> | SINGLE+ | ----+ }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +030060 | +---------+ | }
61 | | |
62 V V |
63 +---------+ +---------+ | }
Roman Elizarovecbc85c2018-09-14 12:52:50 +030064 | LIST_N | ----> | LIST_A | ----+ } [Inactive]NodeList states
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +030065 +---------+ +---------+ | }
66 | | | | |
67 | | +--------+ | |
68 | | | V |
69 | | | +------------+ | +------------+ }
70 | +-------> | COMPLETING | --+-- | CANCELLING | } Finishing states
71 | | +------------+ +------------+ }
72 | | | ^
73 | | | |
74 +--------+---------+--------------------+
75
76
77 This state machine and its transition matrix are optimized for the common case when job is created in active
Roman Elizarovecbc85c2018-09-14 12:52:50 +030078 state (EMPTY_A), at most one completion listener is added to it during its life-time, and it completes
79 successfully without children (in this case it directly goes from EMPTY_A or SINGLE state to FINAL_R
80 state without going to COMPLETING state)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +030081
82 Note, that the actual `_state` variable can also be a reference to atomic operation descriptor `OpDescriptor`
Roman Elizarovff0aab82018-09-22 17:11:53 +030083
84 ---------- TIMELINE of state changes and notification in Job lifecycle ----------
85
86 | The longest possible chain of events in shown, shorter versions cut-through intermediate states,
87 | while still performing all the notifications in this order.
88
89 + Job object is created
90 ## NEW: state == EMPTY_ACTIVE | is InactiveNodeList
Vsevolod Tolstopyatovf157cec2018-09-24 17:22:06 +030091 + initParentJob / initParentJobInternal (invokes attachChild on its parent, initializes parentHandle)
92 ~ waits for start
93 >> start / join / await invoked
Roman Elizarovff0aab82018-09-22 17:11:53 +030094 ## ACTIVE: state == EMPTY_ACTIVE | is JobNode | is NodeList
Vsevolod Tolstopyatovf157cec2018-09-24 17:22:06 +030095 + onStartInternal / onStart (lazy coroutine is started)
96 ~ active coroutine is working (or scheduled to execution)
97 >> childFailed / fail invoked
Roman Elizarovff0aab82018-09-22 17:11:53 +030098 ## FAILING: state is Finishing, state.rootCause != null
Vsevolod Tolstopyatovf157cec2018-09-24 17:22:06 +030099 ------ failing listeners are not admitted anymore, invokeOnCompletion(onFailing=true) returns NonDisposableHandle
100 ------ new children get immediately cancelled, but are still admitted to the list
101 + onFailing
102 + notifyFailing (invoke all failing listeners -- cancel all children, suspended functions resume with exception)
103 + failParent (rootCause of failure is communicated to the parent, parent starts failing, too)
104 ~ waits for completion of coroutine body
105 >> makeCompleting / makeCompletingOnce invoked
Roman Elizarovff0aab82018-09-22 17:11:53 +0300106 ## COMPLETING: state is Finishing, state.isCompleting == true
Vsevolod Tolstopyatovf157cec2018-09-24 17:22:06 +0300107 ------ new children are not admitted anymore, attachChild returns NonDisposableHandle
108 ~ waits for children
109 >> last child completes
110 - computes the final exception
Roman Elizarovff0aab82018-09-22 17:11:53 +0300111 ## SEALED: state is Finishing, state.isSealed == true
Vsevolod Tolstopyatovf157cec2018-09-24 17:22:06 +0300112 ------ cancel/childFailed returns false (cannot handle exceptions anymore)
113 + failParent (final exception is communicated to the parent, parent incorporates it)
114 + handleJobException ("launch" StandaloneCoroutine invokes CoroutineExceptionHandler)
115 ## COMPLETE: state !is Incomplete (CompletedExceptionally | Cancelled)
116 ------ completion listeners are not admitted anymore, invokeOnCompletion returns NonDisposableHandle
117 + parentHandle.dispose
118 + notifyCompletion (invoke all completion listeners)
119 + onCompletionInternal / onCompleted / onCompletedExceptionally
Roman Elizarovff0aab82018-09-22 17:11:53 +0300120
121 ---------------------------------------------------------------------------------
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300122 */
123
124 // Note: use shared objects while we have no listeners
Roman Elizarov563da402018-08-10 19:18:56 +0300125 private val _state = atomic<Any?>(if (active) EMPTY_ACTIVE else EMPTY_NEW)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300126
127 @Volatile
Roman Elizarov5d18d022018-09-22 22:13:05 +0300128 private var parentHandle: ChildHandle? = null
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300129
130 // ------------ initialization ------------
131
132 /**
133 * Initializes parent job.
134 * It shall be invoked at most once after construction after all other initialization.
135 * @suppress **This is unstable API and it is subject to change.**
136 */
137 internal fun initParentJobInternal(parent: Job?) {
138 check(parentHandle == null)
139 if (parent == null) {
140 parentHandle = NonDisposableHandle
141 return
142 }
143 parent.start() // make sure the parent is started
144 @Suppress("DEPRECATION")
145 val handle = parent.attachChild(this)
146 parentHandle = handle
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300147 // now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300148 if (isCompleted) {
149 handle.dispose()
150 parentHandle = NonDisposableHandle // release it just in case, to aid GC
151 }
152 }
153
154 // ------------ state query ------------
155
156 /**
157 * Returns current state of this job.
158 * @suppress **This is unstable API and it is subject to change.**
159 */
160 internal val state: Any? get() {
161 _state.loop { state -> // helper loop on state (complete in-progress atomic operations)
162 if (state !is OpDescriptor) return state
163 state.perform(this)
164 }
165 }
166
167 /**
168 * @suppress **This is unstable API and it is subject to change.**
169 */
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300170 private inline fun loopOnState(block: (Any?) -> Unit): Nothing {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300171 while (true) {
172 block(state)
173 }
174 }
175
Vsevolod Tolstopyatov79414ec2018-08-30 16:50:56 +0300176 public override val isActive: Boolean get() {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300177 val state = this.state
178 return state is Incomplete && state.isActive
179 }
180
181 public final override val isCompleted: Boolean get() = state !is Incomplete
182
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300183 public final override val isFailed: Boolean get() {
184 val state = this.state
185 return state is CompletedExceptionally || (state is Finishing && state.isFailing)
186 }
187
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300188 public final override val isCancelled: Boolean get() {
189 val state = this.state
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300190 return state is Cancelled || (state is Finishing && state.isCancelling)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300191 }
192
193 // ------------ state update ------------
194
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300195 // Finalizes Finishing -> Completed (terminal state) transition.
196 // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
197 private fun tryFinalizeFinishingState(state: Finishing, proposedUpdate: Any?, mode: Int): Boolean {
198 require(proposedUpdate !is Incomplete) // only incomplete -> completed transition is allowed
199 require(this.state === state) // consistency check -- it cannot change
200 require(!state.isSealed) // consistency check -- cannot be sealed yet
201 require(state.isCompleting) // consistency check -- must be marked as completing
202 val proposedException = (proposedUpdate as? CompletedExceptionally)?.cause
203 val proposedCancel = proposedUpdate is Cancelled
204 // Create the final exception and seal the state so that no more exceptions can be added
205 var suppressed = false
206 val finalException = synchronized(state) {
207 val exceptions = state.sealLocked(proposedException)
208 val rootCause = getFinalRootCause(state, exceptions)
209 if (rootCause != null) suppressed = suppressExceptions(rootCause, exceptions)
210 rootCause
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +0300211 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300212 // Create the final state object
213 val finalState = when {
214 // if we have not failed -> use proposed update value
215 finalException == null -> proposedUpdate
216 // small optimization when we can used proposeUpdate object as is on failure
217 finalException === proposedException && proposedCancel == state.isCancelling -> proposedUpdate
218 // cancelled job final state
219 state.isCancelling -> Cancelled(finalException)
220 // failed job final state
221 else -> CompletedExceptionally(finalException)
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300222 }
Vsevolod Tolstopyatovf157cec2018-09-24 17:22:06 +0300223
224 // Now handle exception if parent can't handle it
225 if (finalException != null && !failParent(finalException)) {
226 handleJobException(finalException)
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300227 }
228 // Then CAS to completed state -> it must succeed
229 require(_state.compareAndSet(state, finalState)) { "Unexpected state: ${_state.value}, expected: $state, update: $finalState" }
230 // And process all post-completion actions
231 completeStateFinalization(state, finalState, mode, suppressed)
232 return true
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300233 }
234
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300235 private fun getFinalRootCause(state: Finishing, exceptions: List<Throwable>): Throwable? {
236 // A case of no exceptions
237 if (exceptions.isEmpty()) {
238 // materialize cancellation exception if it was not materialized yet
239 if (state.isCancelling) return createJobCancellationException()
240 return null
241 }
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300242 /*
243 * This is a place where we step on our API limitation:
244 * We can't distinguish internal JobCancellationException from our parent
245 * from external cancellation, thus we ought to collect all exceptions.
246 *
247 * But it has negative consequences: same exception can be added as suppressed more than once.
248 * Consider concurrent parent-child relationship:
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300249 * 1) Child throws E1 and parent throws E2.
250 * 2) Parent goes to "Failing(E1)" and cancels child with E1
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300251 * 3) Child goes to "Cancelling(E1)", but throws an exception E2
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300252 * 4) When child throws, it notifies parent that it is failing, adding its exception to parent's list of exceptions/
253 * 5) Child builds final exception: E1 with suppressed E2, reports it to parent.
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300254 * 6) Parent aggregates three exceptions: original E1, reported E2 and "final" E1.
255 * It filters the third exception, but adds the second one to the first one, thus adding suppressed duplicate.
256 *
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300257 * Note that it's only happening when both parent and child throw exception simultaneously.
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300258 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300259 var rootCause = exceptions[0]
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300260 if (rootCause is JobCancellationException) {
261 val cause = unwrap(rootCause)
262 rootCause = if (cause !== null) {
263 cause
264 } else {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300265 exceptions.firstOrNull { unwrap(it) != null } ?: return rootCause
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300266 }
267 }
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300268 return rootCause
269 }
270
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300271 private fun suppressExceptions(rootCause: Throwable, exceptions: List<Throwable>): Boolean {
272 if (exceptions.size <= 1) return false // nothing more to do here
Vsevolod Tolstopyatovf157cec2018-09-24 17:22:06 +0300273 val seenExceptions = identitySet<Throwable>(exceptions.size)
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300274 var suppressed = false
275 for (i in 1 until exceptions.size) {
276 val unwrapped = unwrap(exceptions[i])
277 if (unwrapped !== null && unwrapped !== rootCause) {
278 if (seenExceptions.add(unwrapped)) {
279 rootCause.addSuppressedThrowable(unwrapped)
280 suppressed = true
281 }
282 }
283 }
284 return suppressed
285 }
286
Roman Elizarov563da402018-08-10 19:18:56 +0300287 private tailrec fun unwrap(exception: Throwable): Throwable? =
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300288 if (exception is JobCancellationException) {
289 val cause = exception.cause
Roman Elizarov563da402018-08-10 19:18:56 +0300290 if (cause !== null) unwrap(cause) else null
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300291 } else {
Roman Elizarov563da402018-08-10 19:18:56 +0300292 exception
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300293 }
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300294
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300295 // fast-path method to finalize normally completed coroutines without children
296 private fun tryFinalizeSimpleState(state: Incomplete, update: Any?, mode: Int): Boolean {
Roman Elizarov7e238752018-09-20 15:05:41 +0300297 check(state is Empty || state is JobNode<*>) // only simple state without lists where children can concurrently add
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300298 check(update !is CompletedExceptionally) // only for normal completion
299 if (!_state.compareAndSet(state, update)) return false
300 completeStateFinalization(state, update, mode, false)
Roman Elizarov563da402018-08-10 19:18:56 +0300301 return true
302 }
303
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300304 // suppressed == true when any exceptions were suppressed while building the final completion cause
305 private fun completeStateFinalization(state: Incomplete, update: Any?, mode: Int, suppressed: Boolean) {
Roman Elizarov563da402018-08-10 19:18:56 +0300306 /*
307 * Now the job in THE FINAL state. We need to properly handle the resulting state.
308 * Order of various invocations here is important.
309 *
Vsevolod Tolstopyatovc7581692018-08-13 17:18:01 +0300310 * 1) Unregister from parent job.
Roman Elizarov563da402018-08-10 19:18:56 +0300311 */
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300312 parentHandle?.let {
313 it.dispose() // volatile read parentHandle _after_ state was updated
314 parentHandle = NonDisposableHandle // release it just in case, to aid GC
315 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300316 val cause = (update as? CompletedExceptionally)?.cause
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300317 /*
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300318 * 2) Invoke onFailing: for resource cancellation resource cancellation etc.
319 * Only notify is was not notified yet.
320 * Note: we do not use notifyFailing here, since we are going to invoke all completion as our next step
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300321 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300322 if (!state.isFailing) onFailing(cause)
Roman Elizarov563da402018-08-10 19:18:56 +0300323 /*
Vsevolod Tolstopyatovc7581692018-08-13 17:18:01 +0300324 * 3) Invoke completion handlers: .join(), callbacks etc.
Roman Elizarov563da402018-08-10 19:18:56 +0300325 * It's important to invoke them only AFTER exception handling, see #208
326 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300327 if (state is JobNode<*>) { // SINGLE/SINGLE+ state -- one completion handler (common case)
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300328 try {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300329 state.invoke(cause)
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300330 } catch (ex: Throwable) {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300331 handleOnCompletionException(CompletionHandlerException("Exception in completion handler $state for $this", ex))
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300332 }
333 } else {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300334 state.list?.notifyCompletion(cause)
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300335 }
Roman Elizarov563da402018-08-10 19:18:56 +0300336 /*
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300337 * 4) Invoke onCompletionInternal: onNext(), timeout de-registration etc.
Roman Elizarov563da402018-08-10 19:18:56 +0300338 * It should be last so all callbacks observe consistent state
339 * of the job which doesn't depend on callback scheduling.
340 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300341 onCompletionInternal(update, mode, suppressed)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300342 }
343
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300344 private fun notifyFailing(list: NodeList, cause: Throwable) {
345 // first cancel our own children
346 onFailing(cause)
347 notifyHandlers<JobFailingNode<*>>(list, cause)
348 // then report to the parent that we are failing
349 failParent(cause) // tentative failure report -- does not matter if there is no parent
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300350 }
351
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300352 private fun NodeList.notifyCompletion(cause: Throwable?) =
353 notifyHandlers<JobNode<*>>(this, cause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300354
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300355 private inline fun <reified T: JobNode<*>> notifyHandlers(list: NodeList, cause: Throwable?) {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300356 var exception: Throwable? = null
357 list.forEach<T> { node ->
358 try {
359 node.invoke(cause)
360 } catch (ex: Throwable) {
361 exception?.apply { addSuppressedThrowable(ex) } ?: run {
362 exception = CompletionHandlerException("Exception in completion handler $node for $this", ex)
363 }
364 }
365 }
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300366 exception?.let { handleOnCompletionException(it) }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300367 }
368
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300369 public final override fun start(): Boolean {
370 loopOnState { state ->
371 when (startInternal(state)) {
372 FALSE -> return false
373 TRUE -> return true
374 }
375 }
376 }
377
378 // returns: RETRY/FALSE/TRUE:
379 // FALSE when not new,
380 // TRUE when started
381 // RETRY when need to retry
382 private fun startInternal(state: Any?): Int {
383 when (state) {
384 is Empty -> { // EMPTY_X state -- no completion handlers
385 if (state.isActive) return FALSE // already active
Roman Elizarov563da402018-08-10 19:18:56 +0300386 if (!_state.compareAndSet(state, EMPTY_ACTIVE)) return RETRY
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300387 onStartInternal()
388 return TRUE
389 }
Roman Elizarovede29232018-09-18 12:53:09 +0300390 is InactiveNodeList -> { // LIST state -- inactive with a list of completion handlers
391 if (!_state.compareAndSet(state, state.list)) return RETRY
392 onStartInternal()
393 return TRUE
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300394 }
395 else -> return FALSE // not a new state
396 }
397 }
398
399 /**
400 * Override to provide the actual [start] action.
401 * This function is invoked exactly once when non-active coroutine is [started][start].
402 */
403 internal open fun onStartInternal() {}
404
405 public final override fun getCancellationException(): CancellationException {
406 val state = this.state
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300407 return when (state) {
408 is Finishing -> state.rootCause?.toCancellationException("Job is failing")
409 ?: error("Job is still new or active: $this")
410 is Incomplete -> error("Job is still new or active: $this")
411 is CompletedExceptionally -> state.cause.toCancellationException("Job has failed")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300412 else -> JobCancellationException("Job has completed normally", null, this)
413 }
414 }
415
416 private fun Throwable.toCancellationException(message: String): CancellationException =
417 this as? CancellationException ?: JobCancellationException(message, this, this@JobSupport)
418
419 /**
420 * Returns the cause that signals the completion of this job -- it returns the original
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300421 * [cancel] cause, [JobCancellationException] or **`null` if this job had completed normally**.
422 * This function throws [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300423 * failing yet.
424 *
425 * @suppress **This is unstable API and it is subject to change.**
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300426 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300427 protected fun getCompletionCause(): Throwable? = loopOnState { state ->
428 return when (state) {
429 is Finishing -> state.rootCause
430 ?: error("Job is still new or active: $this")
431 is Incomplete -> error("Job is still new or active: $this")
432 is CompletedExceptionally -> state.cause
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300433 else -> null
434 }
435 }
436
437 @Suppress("OverridingDeprecatedMember")
438 public final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle =
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300439 invokeOnCompletion(onFailing = false, invokeImmediately = true, handler = handler)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300440
441 @Suppress("OverridingDeprecatedMember")
442 public final override fun invokeOnCompletion(handler: CompletionHandler, onCancelling: Boolean): DisposableHandle =
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300443 invokeOnCompletion(onFailing = onCancelling, invokeImmediately = true, handler = handler)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300444
445 @Suppress("OverridingDeprecatedMember")
446 public final override fun invokeOnCompletion(onCancelling_: Boolean, handler: CompletionHandler): DisposableHandle =
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300447 invokeOnCompletion(onFailing = onCancelling_, invokeImmediately = true, handler = handler)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300448
449 // todo: non-final as a workaround for KT-21968, should be final in the future
450 public override fun invokeOnCompletion(
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300451 onFailing: Boolean,
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300452 invokeImmediately: Boolean,
453 handler: CompletionHandler
454 ): DisposableHandle {
455 var nodeCache: JobNode<*>? = null
456 loopOnState { state ->
457 when (state) {
458 is Empty -> { // EMPTY_X state -- no completion handlers
459 if (state.isActive) {
460 // try move to SINGLE state
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300461 val node = nodeCache ?: makeNode(handler, onFailing).also { nodeCache = it }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300462 if (_state.compareAndSet(state, node)) return node
463 } else
464 promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
465 }
466 is Incomplete -> {
467 val list = state.list
468 if (list == null) { // SINGLE/SINGLE+
469 promoteSingleToNodeList(state as JobNode<*>)
470 } else {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300471 var rootCause: Throwable? = null
472 var handle: DisposableHandle = NonDisposableHandle
473 if (onFailing && state is Finishing) {
474 synchronized(state) {
475 // check if we are installing failing handler on job that is failing
476 rootCause = state.rootCause // != null if we are failing
477 // We add node to the list in two cases --- either the job is not failing
478 // or we are adding a child to a coroutine that is not completing yet
479 if (rootCause == null || handler.isHandlerOf<ChildJob>() && !state.isCompleting) {
480 // Note: add node the list while holding lock on state (make sure it cannot change)
481 val node = nodeCache ?: makeNode(handler, onFailing).also { nodeCache = it }
482 if (!addLastAtomic(state, list, node)) return@loopOnState // retry
483 // just return node if we don't have to invoke handler (not failing yet)
484 if (rootCause == null) return node
485 // otherwise handler is invoked immediately out of the synchronized section & handle returned
486 handle = node
487 }
488 }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300489 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300490 if (rootCause != null) {
491 // Note: attachChild uses invokeImmediately, so it gets invoked when adding to failing job
492 if (invokeImmediately) handler.invokeIt(rootCause)
493 return handle
494 } else {
495 val node = nodeCache ?: makeNode(handler, onFailing).also { nodeCache = it }
496 if (addLastAtomic(state, list, node)) return node
497 }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300498 }
499 }
500 else -> { // is complete
501 // :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
502 // because we play type tricks on Kotlin/JS and handler is not necessarily a function there
503 if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause)
504 return NonDisposableHandle
505 }
506 }
507 }
508 }
509
510 private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode<*> {
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300511 return if (onCancelling)
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300512 (handler as? JobFailingNode<*>)?.also { require(it.job === this) }
513 ?: InvokeOnFailing(this, handler)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300514 else
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300515 (handler as? JobNode<*>)?.also { require(it.job === this && it !is JobFailingNode) }
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300516 ?: InvokeOnCompletion(this, handler)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300517 }
518
519 private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode<*>) =
520 list.addLastIf(node) { this.state === expect }
521
522 private fun promoteEmptyToNodeList(state: Empty) {
Roman Elizarovede29232018-09-18 12:53:09 +0300523 // try to promote it to LIST state with the corresponding state
524 val list = NodeList()
525 val update = if (state.isActive) list else InactiveNodeList(list)
526 _state.compareAndSet(state, update)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300527 }
528
529 private fun promoteSingleToNodeList(state: JobNode<*>) {
530 // try to promote it to list (SINGLE+ state)
Roman Elizarovede29232018-09-18 12:53:09 +0300531 state.addOneIfEmpty(NodeList())
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300532 // it must be in SINGLE+ state or state has changed (node could have need removed from state)
533 val list = state.nextNode // either our NodeList or somebody else won the race, updated state
534 // just attempt converting it to list if state is still the same, then we'll continue lock-free loop
535 _state.compareAndSet(state, list)
536 }
537
538 public final override suspend fun join() {
539 if (!joinInternal()) { // fast-path no wait
Roman Elizarov222f3f22018-07-13 18:47:17 +0300540 coroutineContext.checkCompletion()
541 return // do not suspend
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300542 }
543 return joinSuspend() // slow-path wait
544 }
545
546 private fun joinInternal(): Boolean {
547 loopOnState { state ->
548 if (state !is Incomplete) return false // not active anymore (complete) -- no need to wait
549 if (startInternal(state) >= 0) return true // wait unless need to retry
550 }
551 }
552
553 private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300554 // We have to invoke join() handler only on cancellation, on completion we will be resumed regularly without handlers
555 cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(this, cont).asHandler))
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300556 }
557
558 public final override val onJoin: SelectClause0
559 get() = this
560
561 // registerSelectJoin
562 public final override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) {
563 // fast-path -- check state and select/return if needed
564 loopOnState { state ->
565 if (select.isSelected) return
566 if (state !is Incomplete) {
567 // already complete -- select result
568 if (select.trySelect(null)) {
569 select.completion.context.checkCompletion() // always check for our completion
Roman Elizarov7587eba2018-07-25 12:22:46 +0300570 block.startCoroutineUnintercepted(select.completion)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300571 }
572 return
573 }
574 if (startInternal(state) == 0) {
575 // slow-path -- register waiter for completion
576 select.disposeOnSelect(invokeOnCompletion(handler = SelectJoinOnCompletion(this, select, block).asHandler))
577 return
578 }
579 }
580 }
581
582 /**
583 * @suppress **This is unstable API and it is subject to change.**
584 */
585 internal fun removeNode(node: JobNode<*>) {
586 // remove logic depends on the state of the job
587 loopOnState { state ->
588 when (state) {
589 is JobNode<*> -> { // SINGE/SINGLE+ state -- one completion handler
590 if (state !== node) return // a different job node --> we were already removed
591 // try remove and revert back to empty state
Roman Elizarov563da402018-08-10 19:18:56 +0300592 if (_state.compareAndSet(state, EMPTY_ACTIVE)) return
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300593 }
594 is Incomplete -> { // may have a list of completion handlers
595 // remove node from the list if there is a list
596 if (state.list != null) node.remove()
597 return
598 }
599 else -> return // it is complete and does not have any completion handlers
600 }
601 }
602 }
603
604 /**
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300605 * Returns `true` for job that do not have "body block" to complete and should immediately go into
606 * completing state and start waiting for children.
607 *
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300608 * @suppress **This is unstable API and it is subject to change.**
609 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300610 internal open val onFailComplete: Boolean get() = false
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300611
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300612 // external cancel with (optional) cause
613 public override fun cancel(cause: Throwable?): Boolean =
614 fail(cause, cancel = true) && handlesException
615
616 // child is reporting failure to the parent
Roman Elizarov5d18d022018-09-22 22:13:05 +0300617 internal fun childFailed(cause: Throwable) =
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300618 fail(cause, cancel = false) && handlesException
619
620 // parent is cancelling child
621 public override fun cancelChild(parentJob: Job) {
622 fail(parentJob, cancel = true)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300623 }
624
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300625 // cause is Throwable or Job when cancelChild was invoked, cause can be null only on cancel
626 // returns true is exception was handled, false otherwise
627 private fun fail(cause: Any?, cancel: Boolean): Boolean {
628 if (onFailComplete) {
629 // make sure it is completing, if failMakeCompleting returns true it means it had make it
630 // completing and had recorded exception
631 if (failMakeCompleting(cause, cancel)) return true
632 // otherwise just record failure via makeFailing below
633 }
634 return makeFailing(cause, cancel)
635 }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300636
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300637 private fun failMakeCompleting(cause: Any?, cancel: Boolean): Boolean {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300638 loopOnState { state ->
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300639 if (state !is Incomplete || state is Finishing && state.isCompleting) {
640 return false // already completed/completing, do not even propose update
641 }
642 val proposedUpdate = createFailure(createCauseException(cause), cancel)
643 when (tryMakeCompleting(state, proposedUpdate, mode = MODE_ATOMIC_DEFAULT)) {
644 COMPLETING_ALREADY_COMPLETING -> return false
645 COMPLETING_COMPLETED, COMPLETING_WAITING_CHILDREN -> return true
646 COMPLETING_RETRY -> return@loopOnState
647 else -> error("unexpected result")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300648 }
649 }
650 }
651
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300652 private fun createJobCancellationException() =
653 JobCancellationException("Job was cancelled", null, this)
654
655 // cause is Throwable or Job when cancelChild was invoked, cause can be null only on cancel
656 private fun createCauseException(cause: Any?): Throwable = when(cause) {
657 is Throwable? -> cause ?: createJobCancellationException()
658 else -> (cause as Job).getCancellationException()
659 }
660
661 private fun createFailure(causeException: Throwable, cancel: Boolean): CompletedExceptionally =
662 when {
663 cancel -> Cancelled(causeException)
664 else -> CompletedExceptionally(causeException)
665 }
666
667 // transitions to Failing state
668 // cause is Throwable or Job when cancelChild was invoked, cause can be null only on cancel
669 private fun makeFailing(cause: Any?, cancel: Boolean): Boolean {
670 var causeExceptionCache: Throwable? = null // lazily init result of createCauseException(cause)
671 loopOnState { state ->
672 when (state) {
673 is Finishing -> { // already finishing -- collect exceptions
Vsevolod Tolstopyatovf157cec2018-09-24 17:22:06 +0300674 val notifyRootCause = synchronized(state) {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300675 if (state.isSealed) return false // too late, already sealed -- cannot add exception nor mark cancelled
676 // add exception, do nothing is parent is cancelling child that is already failing
677 val wasFailing = state.isFailing // will notify if was not failing
678 // Materialize missing exception if it is the first exception (otherwise -- don't)
679 if (cause != null || !wasFailing) {
680 val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
681 state.addExceptionLocked(causeException)
682 }
683 // mark as cancelling if cancel was requested
684 if (cancel) state.isCancelling = true
685 // take cause for notification is was not failing before
Vsevolod Tolstopyatovf157cec2018-09-24 17:22:06 +0300686 state.rootCause.takeIf { !wasFailing }
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300687 }
688 notifyRootCause?.let { notifyFailing(state.list, it) }
689 return true
690 }
691 is Incomplete -> {
692 // Not yet finishing -- try to make it failing
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300693 val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
694 if (state.isActive) {
695 // active state becomes failing
Roman Elizarovf45ec8d2018-09-24 23:00:11 +0300696 if (tryMakeFailing(state, causeException, cancel)) return true
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300697 } else {
698 // non active state starts completing
699 when (tryMakeCompleting(state, createFailure(causeException, cancel), mode = MODE_ATOMIC_DEFAULT)) {
700 COMPLETING_ALREADY_COMPLETING -> error("Cannot happen in $state")
701 COMPLETING_COMPLETED, COMPLETING_WAITING_CHILDREN -> return true // ok
702 COMPLETING_RETRY -> return@loopOnState
703 else -> error("unexpected result")
704 }
705 }
706 }
707 else -> return false // already complete
708 }
709 }
710 }
711
Roman Elizarovf45ec8d2018-09-24 23:00:11 +0300712 // Performs promotion of incomplete coroutine state to NodeList for the purpose of
713 // converting coroutine state to Failing, returns null when need to retry
714 private fun getOrPromoteFailingList(state: Incomplete): NodeList? = state.list ?:
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300715 when (state) {
Roman Elizarovf45ec8d2018-09-24 23:00:11 +0300716 is Empty -> NodeList() // we can allocate new empty list that'll get integrated into Failing state
717 is JobNode<*> -> {
718 // SINGLE/SINGLE+ must be promoted to NodeList first, because otherwise we cannot
719 // correctly capture a reference to it
720 promoteSingleToNodeList(state)
721 null // retry
722 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300723 else -> error("State should have list: $state")
724 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300725
726 // try make new failing state on the condition that we're still in the expected state
Roman Elizarovf45ec8d2018-09-24 23:00:11 +0300727 private fun tryMakeFailing(state: Incomplete, rootCause: Throwable, cancel: Boolean): Boolean {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300728 check(state !is Finishing) // only for non-finishing states
729 check(state.isActive) // only for active states
Roman Elizarovf45ec8d2018-09-24 23:00:11 +0300730 // get state's list or else promote to list to correctly operate on child lists
731 val list = getOrPromoteFailingList(state) ?: return false
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300732 // Create failing state (with rootCause!)
733 val failing = Finishing(list, cancel, false, rootCause)
734 if (!_state.compareAndSet(state, failing)) return false
735 // Notify listeners
736 notifyFailing(list, rootCause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300737 return true
738 }
739
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300740 /**
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300741 * This function is used by [CompletableDeferred.complete] (and exceptionally) and by [JobImpl.cancel].
742 * It returns `false` on repeated invocation (when this job is already completing).
743 *
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300744 * @suppress **This is unstable API and it is subject to change.**
745 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300746 internal fun makeCompleting(proposedUpdate: Any?): Boolean = loopOnState { state ->
747 when (tryMakeCompleting(state, proposedUpdate, mode = MODE_ATOMIC_DEFAULT)) {
748 COMPLETING_ALREADY_COMPLETING -> return false
749 COMPLETING_COMPLETED, COMPLETING_WAITING_CHILDREN -> return true
750 COMPLETING_RETRY -> return@loopOnState
751 else -> error("unexpected result")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300752 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300753 }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300754
755 /**
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300756 * This function is used by [AbstractCoroutine.resume].
757 * It throws exception on repeated invocation (when this job is already completing).
758 *
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300759 * Returns:
760 * * `true` if state was updated to completed/cancelled;
761 * * `false` if made completing or it is cancelling and is waiting for children.
762 *
763 * @throws IllegalStateException if job is already complete or completing
764 * @suppress **This is unstable API and it is subject to change.**
765 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300766 internal fun makeCompletingOnce(proposedUpdate: Any?, mode: Int): Boolean = loopOnState { state ->
767 when (tryMakeCompleting(state, proposedUpdate, mode)) {
768 COMPLETING_ALREADY_COMPLETING -> throw IllegalStateException("Job $this is already complete or completing, " +
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300769 "but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull)
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300770 COMPLETING_COMPLETED -> return true
771 COMPLETING_WAITING_CHILDREN -> return false
772 COMPLETING_RETRY -> return@loopOnState
773 else -> error("unexpected result")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300774 }
775 }
776
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300777 private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?, mode: Int): Int {
778 if (state !is Incomplete)
779 return COMPLETING_ALREADY_COMPLETING
Vsevolod Tolstopyatovf157cec2018-09-24 17:22:06 +0300780 /*
781 * FAST PATH -- no children to wait for && simple state (no list) && not failing => can complete immediately
782 * Failures always have to go through Finishing state to serialize exception handling.
783 * Otherwise, there can be a race between (completed state -> handled exception and newly attached child/join)
784 * which may miss unhandled exception.
785 */
Roman Elizarov6634ed72018-09-20 15:13:06 +0300786 if ((state is Empty || state is JobNode<*>) && state !is ChildJob && proposedUpdate !is CompletedExceptionally) {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300787 if (!tryFinalizeSimpleState(state, proposedUpdate, mode)) return COMPLETING_RETRY
788 return COMPLETING_COMPLETED
789 }
790 // get state's list or else promote to list to correctly operate on child lists
Roman Elizarovf45ec8d2018-09-24 23:00:11 +0300791 val list = getOrPromoteFailingList(state) ?: return COMPLETING_RETRY
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300792 // promote to Finishing state if we are not in it yet
793 // This promotion has to be atomic w.r.t to state change, so that a coroutine that is not active yet
794 // atomically transition to finishing & completing state
795 val finishing = state as? Finishing ?: Finishing(list, false, false, null)
796 // must synchronize updates to finishing state
797 var notifyRootCause: Throwable? = null
798 synchronized(finishing) {
799 // check if this state is already completing
800 if (finishing.isCompleting) return COMPLETING_ALREADY_COMPLETING
801 // mark as completing
802 finishing.isCompleting = true
803 // ## IMPORTANT INVARIANT: Only one thread (that had set isCompleting) can go past this point
804 require(!finishing.isSealed) // cannot be sealed
805 // mark as cancelling is the proposed update is to cancel
806 if (proposedUpdate is Cancelled) finishing.isCancelling = true
807 // add new proposed exception to the finishing state
808 val wasFailing = finishing.isFailing
809 (proposedUpdate as? CompletedExceptionally)?.let { finishing.addExceptionLocked(it.cause) }
810 // If it just becomes failing --> must process failing notifications
811 notifyRootCause = finishing.rootCause.takeIf { !wasFailing }
812 }
813 // if we need to promote to finishing then atomically do it here
814 if (finishing !== state) {
815 if (!_state.compareAndSet(state, finishing)) return COMPLETING_RETRY
816 }
817 // process failing notification here -- it cancels all the children _before_ we start to to wait them (sic!!!)
818 notifyRootCause?.let { notifyFailing(list, it) }
819 // now wait for children
Roman Elizarov6634ed72018-09-20 15:13:06 +0300820 val child = firstChild(state)
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300821 if (child != null && tryWaitForChild(finishing, child, proposedUpdate))
822 return COMPLETING_WAITING_CHILDREN
823 // otherwise -- we have not children left (all were already cancelled?)
824 if (tryFinalizeFinishingState(finishing, proposedUpdate, mode))
825 return COMPLETING_COMPLETED
826 // otherwise retry
827 return COMPLETING_RETRY
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300828 }
829
830 private val Any?.exceptionOrNull: Throwable?
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300831 get() = (this as? CompletedExceptionally)?.cause
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300832
833 private fun firstChild(state: Incomplete) =
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300834 state as? ChildJob ?: state.list?.nextChild()
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300835
836 // return false when there is no more incomplete children to wait
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300837 // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
838 private tailrec fun tryWaitForChild(state: Finishing, child: ChildJob, proposedUpdate: Any?): Boolean {
839 val handle = child.childJob.invokeOnCompletion(
840 invokeImmediately = false,
841 handler = ChildCompletion(this, state, child, proposedUpdate).asHandler
842 )
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300843 if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
844 val nextChild = child.nextChild() ?: return false
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300845 return tryWaitForChild(state, nextChild, proposedUpdate)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300846 }
847
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300848 // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
849 private fun continueCompleting(state: Finishing, lastChild: ChildJob, proposedUpdate: Any?) {
850 require(this.state === state) // consistency check -- it cannot change while we are waiting for children
851 // figure out if we need to wait for next child
852 val waitChild = lastChild.nextChild()
853 // try wait for next child
854 if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child
855 // no more children to wait -- try update state
856 if (tryFinalizeFinishingState(state, proposedUpdate, MODE_ATOMIC_DEFAULT)) return
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300857 }
858
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300859 private fun LockFreeLinkedListNode.nextChild(): ChildJob? {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300860 var cur = this
861 while (cur.isRemoved) cur = cur.prevNode // rollback to prev non-removed (or list head)
862 while (true) {
863 cur = cur.nextNode
864 if (cur.isRemoved) continue
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300865 if (cur is ChildJob) return cur
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300866 if (cur is NodeList) return null // checked all -- no more children
867 }
868 }
869
870 public final override val children: Sequence<Job> get() = buildSequence {
871 val state = this@JobSupport.state
872 when (state) {
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300873 is ChildJob -> yield(state.childJob)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300874 is Incomplete -> state.list?.let { list ->
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300875 list.forEach<ChildJob> { yield(it.childJob) }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300876 }
877 }
878 }
879
880 @Suppress("OverridingDeprecatedMember")
Roman Elizarov5d18d022018-09-22 22:13:05 +0300881 public final override fun attachChild(child: Job): ChildHandle {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300882 /*
883 * Note: This function attaches a special ChildNode object. This node object
884 * is handled in a special way on completion on the coroutine (we wait for all of them) and
885 * is handled specially by invokeOnCompletion itself -- it adds this node to the list even
Vsevolod Tolstopyatov409ed262018-09-24 17:48:20 +0300886 * if the job is already failing. For "failing" state child is attached under state lock.
887 * It's required to properly wait all children before completion and provide linearizable hierarchy view:
888 * If child is attached when job is failing, such child will receive immediate cancellation exception,
889 * but parent *will* wait for that child before completion and will handle its exception.
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300890 */
Roman Elizarov5d18d022018-09-22 22:13:05 +0300891 return invokeOnCompletion(onFailing = true, handler = ChildJob(this, child).asHandler) as ChildHandle
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300892 }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300893
894 @Suppress("OverridingDeprecatedMember")
895 public final override fun cancelChildren(cause: Throwable?) {
896 this.cancelChildren(cause) // use extension function
897 }
898
899 /**
900 * Override to process any exceptions that were encountered while invoking completion handlers
901 * installed via [invokeOnCompletion].
902 * @suppress **This is unstable API and it is subject to change.**
903 */
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300904 internal open fun handleOnCompletionException(exception: Throwable) {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300905 throw exception
906 }
907
908 /**
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300909 * This function is invoked once when job is failing or is completed.
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300910 * It's an optimization for [invokeOnCompletion] with `onCancelling` set to `true`.
911 *
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300912 * @suppress **This is unstable API and it is subject to change.*
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300913 */
Roman Elizarovf13cbae2018-09-24 19:48:47 +0300914 protected open fun onFailing(cause: Throwable?) {}
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300915
Roman Elizarov5d18d022018-09-22 22:13:05 +0300916 /**
917 * When this function returns `true` the parent fails on the failure of this job.
918 *
919 * @suppress **This is unstable API and it is subject to change.*
920 */
921 protected open val failsParent: Boolean get() = false
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300922
923 /**
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300924 * Returns `true` for jobs that handle their exceptions via [handleJobException] or integrate them
925 * into the job's result via [onCompletionInternal]. The only instance of the [Job] that does not
926 * handle its exceptions is [JobImpl].
927 *
928 * @suppress **This is unstable API and it is subject to change.*
Roman Elizarov67912f92018-09-16 01:46:43 +0300929 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300930 protected open val handlesException: Boolean get() = true
Roman Elizarov67912f92018-09-16 01:46:43 +0300931
932 /**
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300933 * This method is invoked **exactly once** when the final exception of the job is determined
934 * and before it becomes complete. At the moment of invocation the job and all its children are complete.
935 *
936 * @suppress **This is unstable API and it is subject to change.*
Roman Elizarov67912f92018-09-16 01:46:43 +0300937 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300938 protected open fun handleJobException(exception: Throwable) {}
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300939
Roman Elizarov5d18d022018-09-22 22:13:05 +0300940 private fun failParent(cause: Throwable): Boolean {
941 if (cause is CancellationException) return true
942 if (!failsParent) return false
943 return parentHandle?.childFailed(cause) == true
944 }
945
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300946 /**
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300947 * Override for post-completion actions that need to do something with the state.
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300948 * @param state the final state.
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300949 * @param mode completion mode.
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300950 * @param suppressed true when any exceptions were suppressed while building the final completion cause.
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300951 * @suppress **This is unstable API and it is subject to change.**
952 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300953 internal open fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {}
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300954
955 // for nicer debugging
956 public override fun toString(): String =
957 "${nameString()}{${stateString()}}@$hexAddress"
958
959 /**
960 * @suppress **This is unstable API and it is subject to change.**
961 */
962 internal open fun nameString(): String = classSimpleName
963
964 private fun stateString(): String {
965 val state = this.state
966 return when (state) {
967 is Finishing -> buildString {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300968 when { // cancelling implies failing
969 state.isCancelling -> append("Cancelling")
970 state.isFailing -> append("Failing")
971 else -> append("Active")
972 }
973 if (state.isCompleting) append("Completing")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300974 }
975 is Incomplete -> if (state.isActive) "Active" else "New"
976 is Cancelled -> "Cancelled"
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300977 is CompletedExceptionally -> "Failed"
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300978 else -> "Completed"
979 }
980 }
981
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300982 // Completing, Failing, Cancelling states,
983 // All updates are guarded by synchronized(this), reads are volatile
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +0300984 @Suppress("UNCHECKED_CAST")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300985 private class Finishing(
986 override val list: NodeList,
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300987 @Volatile
988 @JvmField var isCancelling: Boolean,
989 @Volatile
990 @JvmField var isCompleting: Boolean,
991 @Volatile
992 @JvmField var rootCause: Throwable? // NOTE: rootCause is kept even when SEALED
Roman Elizarov4e33cc62018-09-05 01:15:44 +0300993 ) : SynchronizedObject(), Incomplete {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300994 @Volatile
995 private var _exceptionsHolder: Any? = null // Contains null | Throwable | ArrayList | SEALED
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300996
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300997 // NotE: cannot be modified when sealed
998 val isSealed: Boolean get() = _exceptionsHolder === SEALED
999 val isFailing: Boolean get() = rootCause != null
1000 override val isActive: Boolean get() = !isFailing
1001
1002 // Seals current state and returns list of exceptions
Roman Elizarov563da402018-08-10 19:18:56 +03001003 // guarded by `synchronized(this)`
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001004 fun sealLocked(proposedException: Throwable?): List<Throwable> {
1005 val eh = _exceptionsHolder // volatile read
1006 val list = when(eh) {
1007 null -> allocateList()
1008 is Throwable -> allocateList().also { it.add(eh) }
1009 is ArrayList<*> -> eh as ArrayList<Throwable>
1010 else -> error("State is $eh") // already sealed -- cannot happen
1011 }
1012 val rootCause = this.rootCause // volatile read
1013 rootCause?.let { list.add(0, it) } // note -- rootCause goes to the beginning
1014 if (proposedException != null && proposedException != rootCause) list.add(proposedException)
1015 _exceptionsHolder = SEALED
1016 return list
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +03001017 }
1018
Roman Elizarov563da402018-08-10 19:18:56 +03001019 // guarded by `synchronized(this)`
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001020 fun addExceptionLocked(exception: Throwable) {
1021 val rootCause = this.rootCause // volatile read
1022 if (rootCause == null) {
1023 this.rootCause = exception
1024 return
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +03001025 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001026 if (exception === rootCause) return // nothing to do
1027 val eh = _exceptionsHolder // volatile read
1028 when (eh) {
1029 null -> _exceptionsHolder = exception
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +03001030 is Throwable -> {
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001031 if (exception === eh) return // nothing to do
1032 _exceptionsHolder = allocateList().apply {
1033 add(eh)
Roman Elizarov563da402018-08-10 19:18:56 +03001034 add(exception)
1035
1036 }
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +03001037 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001038 is ArrayList<*> -> (eh as ArrayList<Throwable>).add(exception)
1039 else -> error("State is $eh") // already sealed -- cannot happen
Vsevolod Tolstopyatov91ecee82018-08-07 18:24:00 +03001040 }
1041 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001042
1043 private fun allocateList() = ArrayList<Throwable>(4)
1044
1045 override fun toString(): String =
1046 "Finishing[cancelling=$isCancelling, completing=$isCompleting, rootCause=$rootCause, exceptions=$_exceptionsHolder, list=$list]"
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001047 }
1048
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001049 private val Incomplete.isFailing: Boolean
1050 get() = this is Finishing && isFailing
1051
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001052 private val Incomplete.isCancelling: Boolean
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001053 get() = this is Finishing && isCancelling
1054
1055 // Used by parent that is waiting for child completion
1056 private class ChildCompletion(
1057 private val parent: JobSupport,
1058 private val state: Finishing,
1059 private val child: ChildJob,
1060 private val proposedUpdate: Any?
1061 ) : JobNode<Job>(child.childJob) {
1062 override fun invoke(cause: Throwable?) {
1063 parent.continueCompleting(state, child, proposedUpdate)
1064 }
1065 override fun toString(): String =
1066 "ChildCompletion[$child, $proposedUpdate]"
1067 }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001068
1069 /*
1070 * =================================================================================================
1071 * This is ready-to-use implementation for Deferred interface.
1072 * However, it is not type-safe. Conceptually it just exposes the value of the underlying
1073 * completed state as `Any?`
1074 * =================================================================================================
1075 */
1076
1077 public val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally
1078
1079 public fun getCompletionExceptionOrNull(): Throwable? {
1080 val state = this.state
1081 check(state !is Incomplete) { "This job has not completed yet" }
1082 return state.exceptionOrNull
1083 }
1084
1085 /**
1086 * @suppress **This is unstable API and it is subject to change.**
1087 */
1088 internal fun getCompletedInternal(): Any? {
1089 val state = this.state
1090 check(state !is Incomplete) { "This job has not completed yet" }
Roman Elizarov6d9f40f2018-04-28 14:44:02 +03001091 if (state is CompletedExceptionally) throw state.cause
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001092 return state
1093 }
1094
1095 /**
1096 * @suppress **This is unstable API and it is subject to change.**
1097 */
1098 internal suspend fun awaitInternal(): Any? {
1099 // fast-path -- check state (avoid extra object creation)
1100 while(true) { // lock-free loop on state
1101 val state = this.state
1102 if (state !is Incomplete) {
1103 // already complete -- just return result
Roman Elizarov6d9f40f2018-04-28 14:44:02 +03001104 if (state is CompletedExceptionally) throw state.cause
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001105 return state
1106
1107 }
1108 if (startInternal(state) >= 0) break // break unless needs to retry
1109 }
1110 return awaitSuspend() // slow-path
1111 }
1112
1113 private suspend fun awaitSuspend(): Any? = suspendCancellableCoroutine { cont ->
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +03001114 // We have to invoke await() handler only on cancellation, on completion we will be resumed regularly without handlers
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +03001115 cont.disposeOnCancellation(invokeOnCompletion {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001116 val state = this.state
1117 check(state !is Incomplete)
1118 if (state is CompletedExceptionally)
Roman Elizarov6d9f40f2018-04-28 14:44:02 +03001119 cont.resumeWithException(state.cause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001120 else
1121 cont.resume(state)
1122 })
1123 }
1124
1125 /**
1126 * @suppress **This is unstable API and it is subject to change.**
1127 */
1128 // registerSelectAwaitInternal
1129 @Suppress("UNCHECKED_CAST")
1130 internal fun <T, R> registerSelectClause1Internal(select: SelectInstance<R>, block: suspend (T) -> R) {
1131 // fast-path -- check state and select/return if needed
1132 loopOnState { state ->
1133 if (select.isSelected) return
1134 if (state !is Incomplete) {
1135 // already complete -- select result
1136 if (select.trySelect(null)) {
1137 if (state is CompletedExceptionally)
Roman Elizarov6d9f40f2018-04-28 14:44:02 +03001138 select.resumeSelectCancellableWithException(state.cause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001139 else
Roman Elizarov7587eba2018-07-25 12:22:46 +03001140 block.startCoroutineUnintercepted(state as T, select.completion)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001141 }
1142 return
1143 }
1144 if (startInternal(state) == 0) {
1145 // slow-path -- register waiter for completion
1146 select.disposeOnSelect(invokeOnCompletion(handler = SelectAwaitOnCompletion(this, select, block).asHandler))
1147 return
1148 }
1149 }
1150 }
1151
1152 /**
1153 * @suppress **This is unstable API and it is subject to change.**
1154 */
1155 @Suppress("UNCHECKED_CAST")
1156 internal fun <T, R> selectAwaitCompletion(select: SelectInstance<R>, block: suspend (T) -> R) {
1157 val state = this.state
1158 // Note: await is non-atomic (can be cancelled while dispatched)
1159 if (state is CompletedExceptionally)
Roman Elizarov6d9f40f2018-04-28 14:44:02 +03001160 select.resumeSelectCancellableWithException(state.cause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001161 else
1162 block.startCoroutineCancellable(state as T, select.completion)
1163 }
1164}
1165
Roman Elizarov563da402018-08-10 19:18:56 +03001166// --------------- helper classes & constants for job implementation
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001167
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001168private const val COMPLETING_ALREADY_COMPLETING = 0
1169private const val COMPLETING_COMPLETED = 1
1170private const val COMPLETING_WAITING_CHILDREN = 2
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001171private const val COMPLETING_RETRY = 3
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001172
1173private const val RETRY = -1
1174private const val FALSE = 0
1175private const val TRUE = 1
1176
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001177private val SEALED = Symbol("SEALED")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001178
Roman Elizarov563da402018-08-10 19:18:56 +03001179private val EMPTY_NEW = Empty(false)
1180private val EMPTY_ACTIVE = Empty(true)
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +03001181
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001182private class Empty(override val isActive: Boolean) : Incomplete {
1183 override val list: NodeList? get() = null
1184 override fun toString(): String = "Empty{${if (isActive) "Active" else "New" }}"
1185}
1186
1187internal class JobImpl(parent: Job? = null) : JobSupport(true) {
1188 init { initParentJobInternal(parent) }
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001189 override val onFailComplete get() = true
1190 override val handlesException: Boolean get() = false
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001191}
1192
1193// -------- invokeOnCompletion nodes
1194
1195internal interface Incomplete {
1196 val isActive: Boolean
1197 val list: NodeList? // is null only for Empty and JobNode incomplete state objects
1198}
1199
Roman Elizarov6d9f40f2018-04-28 14:44:02 +03001200internal abstract class JobNode<out J : Job>(
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001201 @JvmField val job: J
Roman Elizarovdbd9e1c2018-04-28 15:14:18 +03001202) : CompletionHandlerBase(), DisposableHandle, Incomplete {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001203 override val isActive: Boolean get() = true
1204 override val list: NodeList? get() = null
1205 override fun dispose() = (job as JobSupport).removeNode(this)
1206}
1207
Roman Elizarovede29232018-09-18 12:53:09 +03001208internal class NodeList : LockFreeLinkedListHead(), Incomplete {
1209 override val isActive: Boolean get() = true
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001210 override val list: NodeList get() = this
1211
Roman Elizarovede29232018-09-18 12:53:09 +03001212 fun getString(state: String) = buildString {
1213 append("List{")
1214 append(state)
1215 append("}[")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001216 var first = true
1217 this@NodeList.forEach<JobNode<*>> { node ->
1218 if (first) first = false else append(", ")
1219 append(node)
1220 }
1221 append("]")
1222 }
Roman Elizarovede29232018-09-18 12:53:09 +03001223
1224 override fun toString(): String = getString("Active")
1225}
1226
1227internal class InactiveNodeList(
1228 override val list: NodeList
1229) : Incomplete {
1230 override val isActive: Boolean get() = false
1231 override fun toString(): String = list.getString("New")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001232}
1233
1234private class InvokeOnCompletion(
1235 job: Job,
1236 private val handler: CompletionHandler
1237) : JobNode<Job>(job) {
1238 override fun invoke(cause: Throwable?) = handler.invoke(cause)
1239 override fun toString() = "InvokeOnCompletion[$classSimpleName@$hexAddress]"
1240}
1241
1242private class ResumeOnCompletion(
1243 job: Job,
1244 private val continuation: Continuation<Unit>
1245) : JobNode<Job>(job) {
1246 override fun invoke(cause: Throwable?) = continuation.resume(Unit)
1247 override fun toString() = "ResumeOnCompletion[$continuation]"
1248}
1249
1250internal class DisposeOnCompletion(
1251 job: Job,
1252 private val handle: DisposableHandle
1253) : JobNode<Job>(job) {
1254 override fun invoke(cause: Throwable?) = handle.dispose()
1255 override fun toString(): String = "DisposeOnCompletion[$handle]"
1256}
1257
1258private class SelectJoinOnCompletion<R>(
1259 job: JobSupport,
1260 private val select: SelectInstance<R>,
1261 private val block: suspend () -> R
1262) : JobNode<JobSupport>(job) {
1263 override fun invoke(cause: Throwable?) {
1264 if (select.trySelect(null))
1265 block.startCoroutineCancellable(select.completion)
1266 }
1267 override fun toString(): String = "SelectJoinOnCompletion[$select]"
1268}
1269
1270private class SelectAwaitOnCompletion<T, R>(
1271 job: JobSupport,
1272 private val select: SelectInstance<R>,
1273 private val block: suspend (T) -> R
1274) : JobNode<JobSupport>(job) {
1275 override fun invoke(cause: Throwable?) {
1276 if (select.trySelect(null))
1277 job.selectAwaitCompletion(select, block)
1278 }
1279 override fun toString(): String = "SelectAwaitOnCompletion[$select]"
1280}
1281
1282// -------- invokeOnCancellation nodes
1283
1284/**
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001285 * Marker for node that shall be invoked on in _failing_ state.
1286 * **Note: may be invoked multiple times.**
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001287 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001288internal abstract class JobFailingNode<out J : Job>(job: J) : JobNode<J>(job)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001289
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001290private class InvokeOnFailing(
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001291 job: Job,
1292 private val handler: CompletionHandler
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001293) : JobFailingNode<Job>(job) {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001294 // delegate handler shall be invoked at most once, so here is an additional flag
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001295 private val _invoked = atomic(0) // todo: replace with atomic boolean after migration to recent atomicFu
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001296 override fun invoke(cause: Throwable?) {
1297 if (_invoked.compareAndSet(0, 1)) handler.invoke(cause)
1298 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001299 override fun toString() = "InvokeOnFailing[$classSimpleName@$hexAddress]"
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001300}
1301
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +03001302internal class ChildJob(
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001303 parent: JobSupport,
1304 @JvmField val childJob: Job
Roman Elizarov5d18d022018-09-22 22:13:05 +03001305) : JobFailingNode<JobSupport>(parent), ChildHandle {
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001306 override fun invoke(cause: Throwable?) = childJob.cancelChild(job)
Roman Elizarov5d18d022018-09-22 22:13:05 +03001307 override fun childFailed(cause: Throwable): Boolean = job.childFailed(cause)
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +03001308 override fun toString(): String = "ChildJob[$childJob]"
1309}
1310
1311// Same as ChildJob, but for cancellable continuation
1312internal class ChildContinuation(
1313 parent: Job,
1314 @JvmField val child: AbstractContinuation<*>
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001315) : JobFailingNode<Job>(parent) {
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +03001316 override fun invoke(cause: Throwable?) {
1317 child.cancel(job.getCancellationException())
1318 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001319 override fun toString(): String =
1320 "ChildContinuation[$child]"
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001321}
1322