blob: 7c1ddf452a5ce18e72fc72f60e199cf2ff31d188 [file] [log] [blame]
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03001/*
2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03005package kotlinx.coroutines.experimental
6
7import kotlinx.atomicfu.*
8import kotlinx.coroutines.experimental.internal.*
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03009import kotlinx.coroutines.experimental.intrinsics.*
10import kotlinx.coroutines.experimental.selects.*
11import kotlin.coroutines.experimental.*
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +030012
13/**
14 * A concrete implementation of [Job]. It is optionally a child to a parent job.
15 * This job is cancelled when the parent is complete, but not vise-versa.
16 *
17 * This is an open class designed for extension by more specific classes that might augment the
18 * state and mare store addition state information for completed jobs, like their result values.
19 *
20 * @param active when `true` the job is created in _active_ state, when `false` in _new_ state. See [Job] for details.
21 * @suppress **This is unstable API and it is subject to change.**
22 */
23internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0 {
24 final override val key: CoroutineContext.Key<*> get() = Job
25
26 /*
27 === Internal states ===
28
Roman Elizarovecbc85c2018-09-14 12:52:50 +030029 name state class public state description
30 ------ ------------ ------------ -----------
31 EMPTY_N EmptyNew : New no listeners
32 EMPTY_A EmptyActive : Active no listeners
33 SINGLE JobNode : Active a single listener
34 SINGLE+ JobNode : Active a single listener + NodeList added as its next
35 LIST_N InactiveNodeList : New a list of listeners (promoted once, does not got back to EmptyNew)
36 LIST_A NodeList : Active a list of listeners (promoted once, does not got back to JobNode/EmptyActive)
37 COMPLETING Finishing : Completing has a list of listeners (promoted once from LIST_*)
38 CANCELLING Finishing : Cancelling -- " --
39 FINAL_C Cancelled : Cancelled cancelled (final state)
40 FINAL_F CompletedExceptionally : Completed failed for other reason (final state)
41 FINAL_R <any> : Completed produced some result
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +030042
43 === Transitions ===
44
45 New states Active states Inactive states
46
47 +---------+ +---------+ }
Roman Elizarovecbc85c2018-09-14 12:52:50 +030048 | EMPTY_N | ----> | EMPTY_A | ----+ } Empty states
49 +---------+ +---------+ | }
50 | | | ^ | +----------+
51 | | | | +--> | FINAL_* |
52 | | V | | +----------+
53 | | +---------+ | }
54 | | | SINGLE | ----+ } JobNode states
55 | | +---------+ | }
56 | | | | }
57 | | V | }
58 | | +---------+ | }
59 | +-------> | SINGLE+ | ----+ }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +030060 | +---------+ | }
61 | | |
62 V V |
63 +---------+ +---------+ | }
Roman Elizarovecbc85c2018-09-14 12:52:50 +030064 | LIST_N | ----> | LIST_A | ----+ } [Inactive]NodeList states
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +030065 +---------+ +---------+ | }
66 | | | | |
67 | | +--------+ | |
68 | | | V |
69 | | | +------------+ | +------------+ }
70 | +-------> | COMPLETING | --+-- | CANCELLING | } Finishing states
71 | | +------------+ +------------+ }
72 | | | ^
73 | | | |
74 +--------+---------+--------------------+
75
76
77 This state machine and its transition matrix are optimized for the common case when job is created in active
Roman Elizarovecbc85c2018-09-14 12:52:50 +030078 state (EMPTY_A), at most one completion listener is added to it during its life-time, and it completes
79 successfully without children (in this case it directly goes from EMPTY_A or SINGLE state to FINAL_R
80 state without going to COMPLETING state)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +030081
82 Note, that the actual `_state` variable can also be a reference to atomic operation descriptor `OpDescriptor`
83 */
84
85 // Note: use shared objects while we have no listeners
Roman Elizarov563da402018-08-10 19:18:56 +030086 private val _state = atomic<Any?>(if (active) EMPTY_ACTIVE else EMPTY_NEW)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +030087
88 @Volatile
89 private var parentHandle: DisposableHandle? = null
90
91 // ------------ initialization ------------
92
93 /**
94 * Initializes parent job.
95 * It shall be invoked at most once after construction after all other initialization.
96 * @suppress **This is unstable API and it is subject to change.**
97 */
98 internal fun initParentJobInternal(parent: Job?) {
99 check(parentHandle == null)
100 if (parent == null) {
101 parentHandle = NonDisposableHandle
102 return
103 }
104 parent.start() // make sure the parent is started
105 @Suppress("DEPRECATION")
106 val handle = parent.attachChild(this)
107 parentHandle = handle
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300108 // now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300109 if (isCompleted) {
110 handle.dispose()
111 parentHandle = NonDisposableHandle // release it just in case, to aid GC
112 }
113 }
114
115 // ------------ state query ------------
116
117 /**
118 * Returns current state of this job.
119 * @suppress **This is unstable API and it is subject to change.**
120 */
121 internal val state: Any? get() {
122 _state.loop { state -> // helper loop on state (complete in-progress atomic operations)
123 if (state !is OpDescriptor) return state
124 state.perform(this)
125 }
126 }
127
128 /**
129 * @suppress **This is unstable API and it is subject to change.**
130 */
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300131 private inline fun loopOnState(block: (Any?) -> Unit): Nothing {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300132 while (true) {
133 block(state)
134 }
135 }
136
Vsevolod Tolstopyatov79414ec2018-08-30 16:50:56 +0300137 public override val isActive: Boolean get() {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300138 val state = this.state
139 return state is Incomplete && state.isActive
140 }
141
142 public final override val isCompleted: Boolean get() = state !is Incomplete
143
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300144 public final override val isFailed: Boolean get() {
145 val state = this.state
146 return state is CompletedExceptionally || (state is Finishing && state.isFailing)
147 }
148
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300149 public final override val isCancelled: Boolean get() {
150 val state = this.state
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300151 return state is Cancelled || (state is Finishing && state.isCancelling)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300152 }
153
154 // ------------ state update ------------
155
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300156 // Finalizes Finishing -> Completed (terminal state) transition.
157 // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
158 private fun tryFinalizeFinishingState(state: Finishing, proposedUpdate: Any?, mode: Int): Boolean {
159 require(proposedUpdate !is Incomplete) // only incomplete -> completed transition is allowed
160 require(this.state === state) // consistency check -- it cannot change
161 require(!state.isSealed) // consistency check -- cannot be sealed yet
162 require(state.isCompleting) // consistency check -- must be marked as completing
163 val proposedException = (proposedUpdate as? CompletedExceptionally)?.cause
164 val proposedCancel = proposedUpdate is Cancelled
165 // Create the final exception and seal the state so that no more exceptions can be added
166 var suppressed = false
167 val finalException = synchronized(state) {
168 val exceptions = state.sealLocked(proposedException)
169 val rootCause = getFinalRootCause(state, exceptions)
170 if (rootCause != null) suppressed = suppressExceptions(rootCause, exceptions)
171 rootCause
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +0300172 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300173 // Create the final state object
174 val finalState = when {
175 // if we have not failed -> use proposed update value
176 finalException == null -> proposedUpdate
177 // small optimization when we can used proposeUpdate object as is on failure
178 finalException === proposedException && proposedCancel == state.isCancelling -> proposedUpdate
179 // cancelled job final state
180 state.isCancelling -> Cancelled(finalException)
181 // failed job final state
182 else -> CompletedExceptionally(finalException)
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300183 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300184 // Now handle exception
185 if (finalException != null) {
186 if (!failParent(finalException)) handleJobException(finalException)
187 }
188 // Then CAS to completed state -> it must succeed
189 require(_state.compareAndSet(state, finalState)) { "Unexpected state: ${_state.value}, expected: $state, update: $finalState" }
190 // And process all post-completion actions
191 completeStateFinalization(state, finalState, mode, suppressed)
192 return true
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300193 }
194
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300195 private fun getFinalRootCause(state: Finishing, exceptions: List<Throwable>): Throwable? {
196 // A case of no exceptions
197 if (exceptions.isEmpty()) {
198 // materialize cancellation exception if it was not materialized yet
199 if (state.isCancelling) return createJobCancellationException()
200 return null
201 }
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300202 /*
203 * This is a place where we step on our API limitation:
204 * We can't distinguish internal JobCancellationException from our parent
205 * from external cancellation, thus we ought to collect all exceptions.
206 *
207 * But it has negative consequences: same exception can be added as suppressed more than once.
208 * Consider concurrent parent-child relationship:
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300209 * 1) Child throws E1 and parent throws E2.
210 * 2) Parent goes to "Failing(E1)" and cancels child with E1
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300211 * 3) Child goes to "Cancelling(E1)", but throws an exception E2
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300212 * 4) When child throws, it notifies parent that it is failing, adding its exception to parent's list of exceptions/
213 * 5) Child builds final exception: E1 with suppressed E2, reports it to parent.
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300214 * 6) Parent aggregates three exceptions: original E1, reported E2 and "final" E1.
215 * It filters the third exception, but adds the second one to the first one, thus adding suppressed duplicate.
216 *
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300217 * Note that it's only happening when both parent and child throw exception simultaneously.
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300218 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300219 var rootCause = exceptions[0]
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300220 if (rootCause is JobCancellationException) {
221 val cause = unwrap(rootCause)
222 rootCause = if (cause !== null) {
223 cause
224 } else {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300225 exceptions.firstOrNull { unwrap(it) != null } ?: return rootCause
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300226 }
227 }
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300228 return rootCause
229 }
230
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300231 private fun suppressExceptions(rootCause: Throwable, exceptions: List<Throwable>): Boolean {
232 if (exceptions.size <= 1) return false // nothing more to do here
233 // TODO it should be identity set and optimized for small footprints
234 val seenExceptions = HashSet<Throwable>(exceptions.size)
235 var suppressed = false
236 for (i in 1 until exceptions.size) {
237 val unwrapped = unwrap(exceptions[i])
238 if (unwrapped !== null && unwrapped !== rootCause) {
239 if (seenExceptions.add(unwrapped)) {
240 rootCause.addSuppressedThrowable(unwrapped)
241 suppressed = true
242 }
243 }
244 }
245 return suppressed
246 }
247
Roman Elizarov563da402018-08-10 19:18:56 +0300248 private tailrec fun unwrap(exception: Throwable): Throwable? =
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300249 if (exception is JobCancellationException) {
250 val cause = exception.cause
Roman Elizarov563da402018-08-10 19:18:56 +0300251 if (cause !== null) unwrap(cause) else null
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300252 } else {
Roman Elizarov563da402018-08-10 19:18:56 +0300253 exception
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300254 }
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300255
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300256 // fast-path method to finalize normally completed coroutines without children
257 private fun tryFinalizeSimpleState(state: Incomplete, update: Any?, mode: Int): Boolean {
Roman Elizarov7e238752018-09-20 15:05:41 +0300258 check(state is Empty || state is JobNode<*>) // only simple state without lists where children can concurrently add
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300259 check(update !is CompletedExceptionally) // only for normal completion
260 if (!_state.compareAndSet(state, update)) return false
261 completeStateFinalization(state, update, mode, false)
Roman Elizarov563da402018-08-10 19:18:56 +0300262 return true
263 }
264
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300265 // suppressed == true when any exceptions were suppressed while building the final completion cause
266 private fun completeStateFinalization(state: Incomplete, update: Any?, mode: Int, suppressed: Boolean) {
Roman Elizarov563da402018-08-10 19:18:56 +0300267 /*
268 * Now the job in THE FINAL state. We need to properly handle the resulting state.
269 * Order of various invocations here is important.
270 *
Vsevolod Tolstopyatovc7581692018-08-13 17:18:01 +0300271 * 1) Unregister from parent job.
Roman Elizarov563da402018-08-10 19:18:56 +0300272 */
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300273 parentHandle?.let {
274 it.dispose() // volatile read parentHandle _after_ state was updated
275 parentHandle = NonDisposableHandle // release it just in case, to aid GC
276 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300277 val cause = (update as? CompletedExceptionally)?.cause
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300278 /*
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300279 * 2) Invoke onFailing: for resource cancellation resource cancellation etc.
280 * Only notify is was not notified yet.
281 * Note: we do not use notifyFailing here, since we are going to invoke all completion as our next step
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300282 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300283 if (!state.isFailing) onFailing(cause)
Roman Elizarov563da402018-08-10 19:18:56 +0300284 /*
Vsevolod Tolstopyatovc7581692018-08-13 17:18:01 +0300285 * 3) Invoke completion handlers: .join(), callbacks etc.
Roman Elizarov563da402018-08-10 19:18:56 +0300286 * It's important to invoke them only AFTER exception handling, see #208
287 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300288 if (state is JobNode<*>) { // SINGLE/SINGLE+ state -- one completion handler (common case)
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300289 try {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300290 state.invoke(cause)
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300291 } catch (ex: Throwable) {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300292 handleOnCompletionException(CompletionHandlerException("Exception in completion handler $state for $this", ex))
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300293 }
294 } else {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300295 state.list?.notifyCompletion(cause)
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300296 }
Roman Elizarov563da402018-08-10 19:18:56 +0300297 /*
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300298 * 4) Invoke onCompletionInternal: onNext(), timeout de-registration etc.
Roman Elizarov563da402018-08-10 19:18:56 +0300299 * It should be last so all callbacks observe consistent state
300 * of the job which doesn't depend on callback scheduling.
301 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300302 onCompletionInternal(update, mode, suppressed)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300303 }
304
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300305 private fun notifyFailing(list: NodeList, cause: Throwable) {
306 // first cancel our own children
307 onFailing(cause)
308 notifyHandlers<JobFailingNode<*>>(list, cause)
309 // then report to the parent that we are failing
310 failParent(cause) // tentative failure report -- does not matter if there is no parent
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300311 }
312
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300313 private fun NodeList.notifyCompletion(cause: Throwable?) =
314 notifyHandlers<JobNode<*>>(this, cause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300315
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300316 private inline fun <reified T: JobNode<*>> notifyHandlers(list: NodeList, cause: Throwable?) {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300317 var exception: Throwable? = null
318 list.forEach<T> { node ->
319 try {
320 node.invoke(cause)
321 } catch (ex: Throwable) {
322 exception?.apply { addSuppressedThrowable(ex) } ?: run {
323 exception = CompletionHandlerException("Exception in completion handler $node for $this", ex)
324 }
325 }
326 }
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300327 exception?.let { handleOnCompletionException(it) }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300328 }
329
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300330 public final override fun start(): Boolean {
331 loopOnState { state ->
332 when (startInternal(state)) {
333 FALSE -> return false
334 TRUE -> return true
335 }
336 }
337 }
338
339 // returns: RETRY/FALSE/TRUE:
340 // FALSE when not new,
341 // TRUE when started
342 // RETRY when need to retry
343 private fun startInternal(state: Any?): Int {
344 when (state) {
345 is Empty -> { // EMPTY_X state -- no completion handlers
346 if (state.isActive) return FALSE // already active
Roman Elizarov563da402018-08-10 19:18:56 +0300347 if (!_state.compareAndSet(state, EMPTY_ACTIVE)) return RETRY
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300348 onStartInternal()
349 return TRUE
350 }
Roman Elizarovede29232018-09-18 12:53:09 +0300351 is InactiveNodeList -> { // LIST state -- inactive with a list of completion handlers
352 if (!_state.compareAndSet(state, state.list)) return RETRY
353 onStartInternal()
354 return TRUE
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300355 }
356 else -> return FALSE // not a new state
357 }
358 }
359
360 /**
361 * Override to provide the actual [start] action.
362 * This function is invoked exactly once when non-active coroutine is [started][start].
363 */
364 internal open fun onStartInternal() {}
365
366 public final override fun getCancellationException(): CancellationException {
367 val state = this.state
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300368 return when (state) {
369 is Finishing -> state.rootCause?.toCancellationException("Job is failing")
370 ?: error("Job is still new or active: $this")
371 is Incomplete -> error("Job is still new or active: $this")
372 is CompletedExceptionally -> state.cause.toCancellationException("Job has failed")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300373 else -> JobCancellationException("Job has completed normally", null, this)
374 }
375 }
376
377 private fun Throwable.toCancellationException(message: String): CancellationException =
378 this as? CancellationException ?: JobCancellationException(message, this, this@JobSupport)
379
380 /**
381 * Returns the cause that signals the completion of this job -- it returns the original
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300382 * [cancel] cause, [JobCancellationException] or **`null` if this job had completed normally**.
383 * This function throws [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300384 * failing yet.
385 *
386 * @suppress **This is unstable API and it is subject to change.**
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300387 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300388 protected fun getCompletionCause(): Throwable? = loopOnState { state ->
389 return when (state) {
390 is Finishing -> state.rootCause
391 ?: error("Job is still new or active: $this")
392 is Incomplete -> error("Job is still new or active: $this")
393 is CompletedExceptionally -> state.cause
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300394 else -> null
395 }
396 }
397
398 @Suppress("OverridingDeprecatedMember")
399 public final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle =
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300400 invokeOnCompletion(onFailing = false, invokeImmediately = true, handler = handler)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300401
402 @Suppress("OverridingDeprecatedMember")
403 public final override fun invokeOnCompletion(handler: CompletionHandler, onCancelling: Boolean): DisposableHandle =
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300404 invokeOnCompletion(onFailing = onCancelling, invokeImmediately = true, handler = handler)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300405
406 @Suppress("OverridingDeprecatedMember")
407 public final override fun invokeOnCompletion(onCancelling_: Boolean, handler: CompletionHandler): DisposableHandle =
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300408 invokeOnCompletion(onFailing = onCancelling_, invokeImmediately = true, handler = handler)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300409
410 // todo: non-final as a workaround for KT-21968, should be final in the future
411 public override fun invokeOnCompletion(
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300412 onFailing: Boolean,
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300413 invokeImmediately: Boolean,
414 handler: CompletionHandler
415 ): DisposableHandle {
416 var nodeCache: JobNode<*>? = null
417 loopOnState { state ->
418 when (state) {
419 is Empty -> { // EMPTY_X state -- no completion handlers
420 if (state.isActive) {
421 // try move to SINGLE state
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300422 val node = nodeCache ?: makeNode(handler, onFailing).also { nodeCache = it }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300423 if (_state.compareAndSet(state, node)) return node
424 } else
425 promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
426 }
427 is Incomplete -> {
428 val list = state.list
429 if (list == null) { // SINGLE/SINGLE+
430 promoteSingleToNodeList(state as JobNode<*>)
431 } else {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300432 var rootCause: Throwable? = null
433 var handle: DisposableHandle = NonDisposableHandle
434 if (onFailing && state is Finishing) {
435 synchronized(state) {
436 // check if we are installing failing handler on job that is failing
437 rootCause = state.rootCause // != null if we are failing
438 // We add node to the list in two cases --- either the job is not failing
439 // or we are adding a child to a coroutine that is not completing yet
440 if (rootCause == null || handler.isHandlerOf<ChildJob>() && !state.isCompleting) {
441 // Note: add node the list while holding lock on state (make sure it cannot change)
442 val node = nodeCache ?: makeNode(handler, onFailing).also { nodeCache = it }
443 if (!addLastAtomic(state, list, node)) return@loopOnState // retry
444 // just return node if we don't have to invoke handler (not failing yet)
445 if (rootCause == null) return node
446 // otherwise handler is invoked immediately out of the synchronized section & handle returned
447 handle = node
448 }
449 }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300450 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300451 if (rootCause != null) {
452 // Note: attachChild uses invokeImmediately, so it gets invoked when adding to failing job
453 if (invokeImmediately) handler.invokeIt(rootCause)
454 return handle
455 } else {
456 val node = nodeCache ?: makeNode(handler, onFailing).also { nodeCache = it }
457 if (addLastAtomic(state, list, node)) return node
458 }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300459 }
460 }
461 else -> { // is complete
462 // :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
463 // because we play type tricks on Kotlin/JS and handler is not necessarily a function there
464 if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause)
465 return NonDisposableHandle
466 }
467 }
468 }
469 }
470
471 private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode<*> {
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300472 return if (onCancelling)
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300473 (handler as? JobFailingNode<*>)?.also { require(it.job === this) }
474 ?: InvokeOnFailing(this, handler)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300475 else
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300476 (handler as? JobNode<*>)?.also { require(it.job === this && it !is JobFailingNode) }
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300477 ?: InvokeOnCompletion(this, handler)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300478 }
479
480 private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode<*>) =
481 list.addLastIf(node) { this.state === expect }
482
483 private fun promoteEmptyToNodeList(state: Empty) {
Roman Elizarovede29232018-09-18 12:53:09 +0300484 // try to promote it to LIST state with the corresponding state
485 val list = NodeList()
486 val update = if (state.isActive) list else InactiveNodeList(list)
487 _state.compareAndSet(state, update)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300488 }
489
490 private fun promoteSingleToNodeList(state: JobNode<*>) {
491 // try to promote it to list (SINGLE+ state)
Roman Elizarovede29232018-09-18 12:53:09 +0300492 state.addOneIfEmpty(NodeList())
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300493 // it must be in SINGLE+ state or state has changed (node could have need removed from state)
494 val list = state.nextNode // either our NodeList or somebody else won the race, updated state
495 // just attempt converting it to list if state is still the same, then we'll continue lock-free loop
496 _state.compareAndSet(state, list)
497 }
498
499 public final override suspend fun join() {
500 if (!joinInternal()) { // fast-path no wait
Roman Elizarov222f3f22018-07-13 18:47:17 +0300501 coroutineContext.checkCompletion()
502 return // do not suspend
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300503 }
504 return joinSuspend() // slow-path wait
505 }
506
507 private fun joinInternal(): Boolean {
508 loopOnState { state ->
509 if (state !is Incomplete) return false // not active anymore (complete) -- no need to wait
510 if (startInternal(state) >= 0) return true // wait unless need to retry
511 }
512 }
513
514 private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300515 // We have to invoke join() handler only on cancellation, on completion we will be resumed regularly without handlers
516 cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(this, cont).asHandler))
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300517 }
518
519 public final override val onJoin: SelectClause0
520 get() = this
521
522 // registerSelectJoin
523 public final override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) {
524 // fast-path -- check state and select/return if needed
525 loopOnState { state ->
526 if (select.isSelected) return
527 if (state !is Incomplete) {
528 // already complete -- select result
529 if (select.trySelect(null)) {
530 select.completion.context.checkCompletion() // always check for our completion
Roman Elizarov7587eba2018-07-25 12:22:46 +0300531 block.startCoroutineUnintercepted(select.completion)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300532 }
533 return
534 }
535 if (startInternal(state) == 0) {
536 // slow-path -- register waiter for completion
537 select.disposeOnSelect(invokeOnCompletion(handler = SelectJoinOnCompletion(this, select, block).asHandler))
538 return
539 }
540 }
541 }
542
543 /**
544 * @suppress **This is unstable API and it is subject to change.**
545 */
546 internal fun removeNode(node: JobNode<*>) {
547 // remove logic depends on the state of the job
548 loopOnState { state ->
549 when (state) {
550 is JobNode<*> -> { // SINGE/SINGLE+ state -- one completion handler
551 if (state !== node) return // a different job node --> we were already removed
552 // try remove and revert back to empty state
Roman Elizarov563da402018-08-10 19:18:56 +0300553 if (_state.compareAndSet(state, EMPTY_ACTIVE)) return
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300554 }
555 is Incomplete -> { // may have a list of completion handlers
556 // remove node from the list if there is a list
557 if (state.list != null) node.remove()
558 return
559 }
560 else -> return // it is complete and does not have any completion handlers
561 }
562 }
563 }
564
565 /**
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300566 * Returns `true` for job that do not have "body block" to complete and should immediately go into
567 * completing state and start waiting for children.
568 *
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300569 * @suppress **This is unstable API and it is subject to change.**
570 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300571 internal open val onFailComplete: Boolean get() = false
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300572
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300573 // external cancel with (optional) cause
574 public override fun cancel(cause: Throwable?): Boolean =
575 fail(cause, cancel = true) && handlesException
576
577 // child is reporting failure to the parent
578 public override fun childFailed(cause: Throwable) =
579 fail(cause, cancel = false) && handlesException
580
581 // parent is cancelling child
582 public override fun cancelChild(parentJob: Job) {
583 fail(parentJob, cancel = true)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300584 }
585
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300586 // cause is Throwable or Job when cancelChild was invoked, cause can be null only on cancel
587 // returns true is exception was handled, false otherwise
588 private fun fail(cause: Any?, cancel: Boolean): Boolean {
589 if (onFailComplete) {
590 // make sure it is completing, if failMakeCompleting returns true it means it had make it
591 // completing and had recorded exception
592 if (failMakeCompleting(cause, cancel)) return true
593 // otherwise just record failure via makeFailing below
594 }
595 return makeFailing(cause, cancel)
596 }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300597
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300598 private fun failMakeCompleting(cause: Any?, cancel: Boolean): Boolean {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300599 loopOnState { state ->
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300600 if (state !is Incomplete || state is Finishing && state.isCompleting) {
601 return false // already completed/completing, do not even propose update
602 }
603 val proposedUpdate = createFailure(createCauseException(cause), cancel)
604 when (tryMakeCompleting(state, proposedUpdate, mode = MODE_ATOMIC_DEFAULT)) {
605 COMPLETING_ALREADY_COMPLETING -> return false
606 COMPLETING_COMPLETED, COMPLETING_WAITING_CHILDREN -> return true
607 COMPLETING_RETRY -> return@loopOnState
608 else -> error("unexpected result")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300609 }
610 }
611 }
612
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300613 private fun createJobCancellationException() =
614 JobCancellationException("Job was cancelled", null, this)
615
616 // cause is Throwable or Job when cancelChild was invoked, cause can be null only on cancel
617 private fun createCauseException(cause: Any?): Throwable = when(cause) {
618 is Throwable? -> cause ?: createJobCancellationException()
619 else -> (cause as Job).getCancellationException()
620 }
621
622 private fun createFailure(causeException: Throwable, cancel: Boolean): CompletedExceptionally =
623 when {
624 cancel -> Cancelled(causeException)
625 else -> CompletedExceptionally(causeException)
626 }
627
628 // transitions to Failing state
629 // cause is Throwable or Job when cancelChild was invoked, cause can be null only on cancel
630 private fun makeFailing(cause: Any?, cancel: Boolean): Boolean {
631 var causeExceptionCache: Throwable? = null // lazily init result of createCauseException(cause)
632 loopOnState { state ->
633 when (state) {
634 is Finishing -> { // already finishing -- collect exceptions
635 var notifyRootCause: Throwable? = null
636 synchronized(state) {
637 if (state.isSealed) return false // too late, already sealed -- cannot add exception nor mark cancelled
638 // add exception, do nothing is parent is cancelling child that is already failing
639 val wasFailing = state.isFailing // will notify if was not failing
640 // Materialize missing exception if it is the first exception (otherwise -- don't)
641 if (cause != null || !wasFailing) {
642 val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
643 state.addExceptionLocked(causeException)
644 }
645 // mark as cancelling if cancel was requested
646 if (cancel) state.isCancelling = true
647 // take cause for notification is was not failing before
648 notifyRootCause = state.rootCause.takeIf { !wasFailing }
649 }
650 notifyRootCause?.let { notifyFailing(state.list, it) }
651 return true
652 }
653 is Incomplete -> {
654 // Not yet finishing -- try to make it failing
655 val list = tryPromoteToList(state) ?: return@loopOnState
656 val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
657 if (state.isActive) {
658 // active state becomes failing
659 if (tryMakeFailing(state, list, causeException, cancel)) return true
660 } else {
661 // non active state starts completing
662 when (tryMakeCompleting(state, createFailure(causeException, cancel), mode = MODE_ATOMIC_DEFAULT)) {
663 COMPLETING_ALREADY_COMPLETING -> error("Cannot happen in $state")
664 COMPLETING_COMPLETED, COMPLETING_WAITING_CHILDREN -> return true // ok
665 COMPLETING_RETRY -> return@loopOnState
666 else -> error("unexpected result")
667 }
668 }
669 }
670 else -> return false // already complete
671 }
672 }
673 }
674
675 // Performs promotion of incomplete coroutine state to NodeList, returns null when need to retry
676 private fun tryPromoteToList(state: Incomplete): NodeList? = state.list ?: null.also {
677 when (state) {
678 is Empty -> promoteEmptyToNodeList(state)
679 is JobNode<*> -> promoteSingleToNodeList(state)
680 else -> error("State should have list: $state")
681 }
682 }
683
684 // try make new failing state on the condition that we're still in the expected state
685 private fun tryMakeFailing(state: Incomplete, list: NodeList, rootCause: Throwable, cancel: Boolean): Boolean {
686 check(state !is Finishing) // only for non-finishing states
687 check(state.isActive) // only for active states
688 // Create failing state (with rootCause!)
689 val failing = Finishing(list, cancel, false, rootCause)
690 if (!_state.compareAndSet(state, failing)) return false
691 // Notify listeners
692 notifyFailing(list, rootCause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300693 return true
694 }
695
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300696 /**
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300697 * This function is used by [CompletableDeferred.complete] (and exceptionally) and by [JobImpl.cancel].
698 * It returns `false` on repeated invocation (when this job is already completing).
699 *
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300700 * @suppress **This is unstable API and it is subject to change.**
701 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300702 internal fun makeCompleting(proposedUpdate: Any?): Boolean = loopOnState { state ->
703 when (tryMakeCompleting(state, proposedUpdate, mode = MODE_ATOMIC_DEFAULT)) {
704 COMPLETING_ALREADY_COMPLETING -> return false
705 COMPLETING_COMPLETED, COMPLETING_WAITING_CHILDREN -> return true
706 COMPLETING_RETRY -> return@loopOnState
707 else -> error("unexpected result")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300708 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300709 }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300710
711 /**
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300712 * This function is used by [AbstractCoroutine.resume].
713 * It throws exception on repeated invocation (when this job is already completing).
714 *
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300715 * Returns:
716 * * `true` if state was updated to completed/cancelled;
717 * * `false` if made completing or it is cancelling and is waiting for children.
718 *
719 * @throws IllegalStateException if job is already complete or completing
720 * @suppress **This is unstable API and it is subject to change.**
721 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300722 internal fun makeCompletingOnce(proposedUpdate: Any?, mode: Int): Boolean = loopOnState { state ->
723 when (tryMakeCompleting(state, proposedUpdate, mode)) {
724 COMPLETING_ALREADY_COMPLETING -> throw IllegalStateException("Job $this is already complete or completing, " +
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300725 "but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull)
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300726 COMPLETING_COMPLETED -> return true
727 COMPLETING_WAITING_CHILDREN -> return false
728 COMPLETING_RETRY -> return@loopOnState
729 else -> error("unexpected result")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300730 }
731 }
732
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300733 private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?, mode: Int): Int {
734 if (state !is Incomplete)
735 return COMPLETING_ALREADY_COMPLETING
736 // find first child
737 val child = firstChild(state)
Roman Elizarov7e238752018-09-20 15:05:41 +0300738 // FAST PATH -- no children to wait for && simple state (no list) && not failing => can complete immediately
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300739 // Failures always have to go through Finishing state to serialize exception handling
Roman Elizarov7e238752018-09-20 15:05:41 +0300740 if (child == null && (state is Empty || state is JobNode<*>) && proposedUpdate !is CompletedExceptionally) {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300741 if (!tryFinalizeSimpleState(state, proposedUpdate, mode)) return COMPLETING_RETRY
742 return COMPLETING_COMPLETED
743 }
744 // get state's list or else promote to list to correctly operate on child lists
745 val list = tryPromoteToList(state) ?: return COMPLETING_RETRY
746 // promote to Finishing state if we are not in it yet
747 // This promotion has to be atomic w.r.t to state change, so that a coroutine that is not active yet
748 // atomically transition to finishing & completing state
749 val finishing = state as? Finishing ?: Finishing(list, false, false, null)
750 // must synchronize updates to finishing state
751 var notifyRootCause: Throwable? = null
752 synchronized(finishing) {
753 // check if this state is already completing
754 if (finishing.isCompleting) return COMPLETING_ALREADY_COMPLETING
755 // mark as completing
756 finishing.isCompleting = true
757 // ## IMPORTANT INVARIANT: Only one thread (that had set isCompleting) can go past this point
758 require(!finishing.isSealed) // cannot be sealed
759 // mark as cancelling is the proposed update is to cancel
760 if (proposedUpdate is Cancelled) finishing.isCancelling = true
761 // add new proposed exception to the finishing state
762 val wasFailing = finishing.isFailing
763 (proposedUpdate as? CompletedExceptionally)?.let { finishing.addExceptionLocked(it.cause) }
764 // If it just becomes failing --> must process failing notifications
765 notifyRootCause = finishing.rootCause.takeIf { !wasFailing }
766 }
767 // if we need to promote to finishing then atomically do it here
768 if (finishing !== state) {
769 if (!_state.compareAndSet(state, finishing)) return COMPLETING_RETRY
770 }
771 // process failing notification here -- it cancels all the children _before_ we start to to wait them (sic!!!)
772 notifyRootCause?.let { notifyFailing(list, it) }
773 // now wait for children
774 if (child != null && tryWaitForChild(finishing, child, proposedUpdate))
775 return COMPLETING_WAITING_CHILDREN
776 // otherwise -- we have not children left (all were already cancelled?)
777 if (tryFinalizeFinishingState(finishing, proposedUpdate, mode))
778 return COMPLETING_COMPLETED
779 // otherwise retry
780 return COMPLETING_RETRY
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300781 }
782
783 private val Any?.exceptionOrNull: Throwable?
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300784 get() = (this as? CompletedExceptionally)?.cause
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300785
786 private fun firstChild(state: Incomplete) =
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300787 state as? ChildJob ?: state.list?.nextChild()
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300788
789 // return false when there is no more incomplete children to wait
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300790 // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
791 private tailrec fun tryWaitForChild(state: Finishing, child: ChildJob, proposedUpdate: Any?): Boolean {
792 val handle = child.childJob.invokeOnCompletion(
793 invokeImmediately = false,
794 handler = ChildCompletion(this, state, child, proposedUpdate).asHandler
795 )
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300796 if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
797 val nextChild = child.nextChild() ?: return false
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300798 return tryWaitForChild(state, nextChild, proposedUpdate)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300799 }
800
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300801 // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
802 private fun continueCompleting(state: Finishing, lastChild: ChildJob, proposedUpdate: Any?) {
803 require(this.state === state) // consistency check -- it cannot change while we are waiting for children
804 // figure out if we need to wait for next child
805 val waitChild = lastChild.nextChild()
806 // try wait for next child
807 if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child
808 // no more children to wait -- try update state
809 if (tryFinalizeFinishingState(state, proposedUpdate, MODE_ATOMIC_DEFAULT)) return
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300810 }
811
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300812 private fun LockFreeLinkedListNode.nextChild(): ChildJob? {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300813 var cur = this
814 while (cur.isRemoved) cur = cur.prevNode // rollback to prev non-removed (or list head)
815 while (true) {
816 cur = cur.nextNode
817 if (cur.isRemoved) continue
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300818 if (cur is ChildJob) return cur
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300819 if (cur is NodeList) return null // checked all -- no more children
820 }
821 }
822
823 public final override val children: Sequence<Job> get() = buildSequence {
824 val state = this@JobSupport.state
825 when (state) {
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300826 is ChildJob -> yield(state.childJob)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300827 is Incomplete -> state.list?.let { list ->
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300828 list.forEach<ChildJob> { yield(it.childJob) }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300829 }
830 }
831 }
832
833 @Suppress("OverridingDeprecatedMember")
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300834 public final override fun attachChild(child: Job): DisposableHandle {
835 /*
836 * Note: This function attaches a special ChildNode object. This node object
837 * is handled in a special way on completion on the coroutine (we wait for all of them) and
838 * is handled specially by invokeOnCompletion itself -- it adds this node to the list even
839 * if the job is already failing.
840 */
841 return invokeOnCompletion(onFailing = true, handler = ChildJob(this, child).asHandler)
842 }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300843
844 @Suppress("OverridingDeprecatedMember")
845 public final override fun cancelChildren(cause: Throwable?) {
846 this.cancelChildren(cause) // use extension function
847 }
848
849 /**
850 * Override to process any exceptions that were encountered while invoking completion handlers
851 * installed via [invokeOnCompletion].
852 * @suppress **This is unstable API and it is subject to change.**
853 */
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300854 internal open fun handleOnCompletionException(exception: Throwable) {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300855 throw exception
856 }
857
858 /**
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300859 * This function is invoked once when job is failing or is completed.
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300860 * It's an optimization for [invokeOnCompletion] with `onCancelling` set to `true`.
861 *
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300862 * @suppress **This is unstable API and it is subject to change.*
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300863 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300864 internal open fun onFailing(cause: Throwable?) {}
865
866 // todo: make it for all kinds of coroutines, now only launch & actor override and handleExceptionViaJob
867 internal open fun failParent(exception: Throwable): Boolean = false
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300868
869 /**
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300870 * Returns `true` for jobs that handle their exceptions via [handleJobException] or integrate them
871 * into the job's result via [onCompletionInternal]. The only instance of the [Job] that does not
872 * handle its exceptions is [JobImpl].
873 *
874 * @suppress **This is unstable API and it is subject to change.*
Roman Elizarov67912f92018-09-16 01:46:43 +0300875 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300876 protected open val handlesException: Boolean get() = true
Roman Elizarov67912f92018-09-16 01:46:43 +0300877
878 /**
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300879 * This method is invoked **exactly once** when the final exception of the job is determined
880 * and before it becomes complete. At the moment of invocation the job and all its children are complete.
881 *
882 * @suppress **This is unstable API and it is subject to change.*
Roman Elizarov67912f92018-09-16 01:46:43 +0300883 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300884 protected open fun handleJobException(exception: Throwable) {}
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300885
886 /**
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300887 * Override for post-completion actions that need to do something with the state.
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300888 * @param state the final state.
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300889 * @param mode completion mode.
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300890 * @param suppressed true when any exceptions were suppressed while building the final completion cause.
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300891 * @suppress **This is unstable API and it is subject to change.**
892 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300893 internal open fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {}
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300894
895 // for nicer debugging
896 public override fun toString(): String =
897 "${nameString()}{${stateString()}}@$hexAddress"
898
899 /**
900 * @suppress **This is unstable API and it is subject to change.**
901 */
902 internal open fun nameString(): String = classSimpleName
903
904 private fun stateString(): String {
905 val state = this.state
906 return when (state) {
907 is Finishing -> buildString {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300908 when { // cancelling implies failing
909 state.isCancelling -> append("Cancelling")
910 state.isFailing -> append("Failing")
911 else -> append("Active")
912 }
913 if (state.isCompleting) append("Completing")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300914 }
915 is Incomplete -> if (state.isActive) "Active" else "New"
916 is Cancelled -> "Cancelled"
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300917 is CompletedExceptionally -> "Failed"
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300918 else -> "Completed"
919 }
920 }
921
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300922 // Completing, Failing, Cancelling states,
923 // All updates are guarded by synchronized(this), reads are volatile
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +0300924 @Suppress("UNCHECKED_CAST")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300925 private class Finishing(
926 override val list: NodeList,
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300927 @Volatile
928 @JvmField var isCancelling: Boolean,
929 @Volatile
930 @JvmField var isCompleting: Boolean,
931 @Volatile
932 @JvmField var rootCause: Throwable? // NOTE: rootCause is kept even when SEALED
Roman Elizarov4e33cc62018-09-05 01:15:44 +0300933 ) : SynchronizedObject(), Incomplete {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300934 @Volatile
935 private var _exceptionsHolder: Any? = null // Contains null | Throwable | ArrayList | SEALED
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300936
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300937 // NotE: cannot be modified when sealed
938 val isSealed: Boolean get() = _exceptionsHolder === SEALED
939 val isFailing: Boolean get() = rootCause != null
940 override val isActive: Boolean get() = !isFailing
941
942 // Seals current state and returns list of exceptions
Roman Elizarov563da402018-08-10 19:18:56 +0300943 // guarded by `synchronized(this)`
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300944 fun sealLocked(proposedException: Throwable?): List<Throwable> {
945 val eh = _exceptionsHolder // volatile read
946 val list = when(eh) {
947 null -> allocateList()
948 is Throwable -> allocateList().also { it.add(eh) }
949 is ArrayList<*> -> eh as ArrayList<Throwable>
950 else -> error("State is $eh") // already sealed -- cannot happen
951 }
952 val rootCause = this.rootCause // volatile read
953 rootCause?.let { list.add(0, it) } // note -- rootCause goes to the beginning
954 if (proposedException != null && proposedException != rootCause) list.add(proposedException)
955 _exceptionsHolder = SEALED
956 return list
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +0300957 }
958
Roman Elizarov563da402018-08-10 19:18:56 +0300959 // guarded by `synchronized(this)`
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300960 fun addExceptionLocked(exception: Throwable) {
961 val rootCause = this.rootCause // volatile read
962 if (rootCause == null) {
963 this.rootCause = exception
964 return
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300965 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300966 if (exception === rootCause) return // nothing to do
967 val eh = _exceptionsHolder // volatile read
968 when (eh) {
969 null -> _exceptionsHolder = exception
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +0300970 is Throwable -> {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300971 if (exception === eh) return // nothing to do
972 _exceptionsHolder = allocateList().apply {
973 add(eh)
Roman Elizarov563da402018-08-10 19:18:56 +0300974 add(exception)
975
976 }
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +0300977 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300978 is ArrayList<*> -> (eh as ArrayList<Throwable>).add(exception)
979 else -> error("State is $eh") // already sealed -- cannot happen
Vsevolod Tolstopyatov91ecee82018-08-07 18:24:00 +0300980 }
981 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300982
983 private fun allocateList() = ArrayList<Throwable>(4)
984
985 override fun toString(): String =
986 "Finishing[cancelling=$isCancelling, completing=$isCompleting, rootCause=$rootCause, exceptions=$_exceptionsHolder, list=$list]"
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300987 }
988
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300989 private val Incomplete.isFailing: Boolean
990 get() = this is Finishing && isFailing
991
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300992 private val Incomplete.isCancelling: Boolean
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300993 get() = this is Finishing && isCancelling
994
995 // Used by parent that is waiting for child completion
996 private class ChildCompletion(
997 private val parent: JobSupport,
998 private val state: Finishing,
999 private val child: ChildJob,
1000 private val proposedUpdate: Any?
1001 ) : JobNode<Job>(child.childJob) {
1002 override fun invoke(cause: Throwable?) {
1003 parent.continueCompleting(state, child, proposedUpdate)
1004 }
1005 override fun toString(): String =
1006 "ChildCompletion[$child, $proposedUpdate]"
1007 }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001008
1009 /*
1010 * =================================================================================================
1011 * This is ready-to-use implementation for Deferred interface.
1012 * However, it is not type-safe. Conceptually it just exposes the value of the underlying
1013 * completed state as `Any?`
1014 * =================================================================================================
1015 */
1016
1017 public val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally
1018
1019 public fun getCompletionExceptionOrNull(): Throwable? {
1020 val state = this.state
1021 check(state !is Incomplete) { "This job has not completed yet" }
1022 return state.exceptionOrNull
1023 }
1024
1025 /**
1026 * @suppress **This is unstable API and it is subject to change.**
1027 */
1028 internal fun getCompletedInternal(): Any? {
1029 val state = this.state
1030 check(state !is Incomplete) { "This job has not completed yet" }
Roman Elizarov6d9f40f2018-04-28 14:44:02 +03001031 if (state is CompletedExceptionally) throw state.cause
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001032 return state
1033 }
1034
1035 /**
1036 * @suppress **This is unstable API and it is subject to change.**
1037 */
1038 internal suspend fun awaitInternal(): Any? {
1039 // fast-path -- check state (avoid extra object creation)
1040 while(true) { // lock-free loop on state
1041 val state = this.state
1042 if (state !is Incomplete) {
1043 // already complete -- just return result
Roman Elizarov6d9f40f2018-04-28 14:44:02 +03001044 if (state is CompletedExceptionally) throw state.cause
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001045 return state
1046
1047 }
1048 if (startInternal(state) >= 0) break // break unless needs to retry
1049 }
1050 return awaitSuspend() // slow-path
1051 }
1052
1053 private suspend fun awaitSuspend(): Any? = suspendCancellableCoroutine { cont ->
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +03001054 // We have to invoke await() handler only on cancellation, on completion we will be resumed regularly without handlers
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +03001055 cont.disposeOnCancellation(invokeOnCompletion {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001056 val state = this.state
1057 check(state !is Incomplete)
1058 if (state is CompletedExceptionally)
Roman Elizarov6d9f40f2018-04-28 14:44:02 +03001059 cont.resumeWithException(state.cause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001060 else
1061 cont.resume(state)
1062 })
1063 }
1064
1065 /**
1066 * @suppress **This is unstable API and it is subject to change.**
1067 */
1068 // registerSelectAwaitInternal
1069 @Suppress("UNCHECKED_CAST")
1070 internal fun <T, R> registerSelectClause1Internal(select: SelectInstance<R>, block: suspend (T) -> R) {
1071 // fast-path -- check state and select/return if needed
1072 loopOnState { state ->
1073 if (select.isSelected) return
1074 if (state !is Incomplete) {
1075 // already complete -- select result
1076 if (select.trySelect(null)) {
1077 if (state is CompletedExceptionally)
Roman Elizarov6d9f40f2018-04-28 14:44:02 +03001078 select.resumeSelectCancellableWithException(state.cause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001079 else
Roman Elizarov7587eba2018-07-25 12:22:46 +03001080 block.startCoroutineUnintercepted(state as T, select.completion)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001081 }
1082 return
1083 }
1084 if (startInternal(state) == 0) {
1085 // slow-path -- register waiter for completion
1086 select.disposeOnSelect(invokeOnCompletion(handler = SelectAwaitOnCompletion(this, select, block).asHandler))
1087 return
1088 }
1089 }
1090 }
1091
1092 /**
1093 * @suppress **This is unstable API and it is subject to change.**
1094 */
1095 @Suppress("UNCHECKED_CAST")
1096 internal fun <T, R> selectAwaitCompletion(select: SelectInstance<R>, block: suspend (T) -> R) {
1097 val state = this.state
1098 // Note: await is non-atomic (can be cancelled while dispatched)
1099 if (state is CompletedExceptionally)
Roman Elizarov6d9f40f2018-04-28 14:44:02 +03001100 select.resumeSelectCancellableWithException(state.cause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001101 else
1102 block.startCoroutineCancellable(state as T, select.completion)
1103 }
1104}
1105
Roman Elizarov563da402018-08-10 19:18:56 +03001106// --------------- helper classes & constants for job implementation
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001107
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001108private const val COMPLETING_ALREADY_COMPLETING = 0
1109private const val COMPLETING_COMPLETED = 1
1110private const val COMPLETING_WAITING_CHILDREN = 2
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001111private const val COMPLETING_RETRY = 3
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001112
1113private const val RETRY = -1
1114private const val FALSE = 0
1115private const val TRUE = 1
1116
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001117private val SEALED = Symbol("SEALED")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001118
Roman Elizarov563da402018-08-10 19:18:56 +03001119private val EMPTY_NEW = Empty(false)
1120private val EMPTY_ACTIVE = Empty(true)
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +03001121
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001122private class Empty(override val isActive: Boolean) : Incomplete {
1123 override val list: NodeList? get() = null
1124 override fun toString(): String = "Empty{${if (isActive) "Active" else "New" }}"
1125}
1126
1127internal class JobImpl(parent: Job? = null) : JobSupport(true) {
1128 init { initParentJobInternal(parent) }
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001129 override val onFailComplete get() = true
1130 override val handlesException: Boolean get() = false
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001131}
1132
1133// -------- invokeOnCompletion nodes
1134
1135internal interface Incomplete {
1136 val isActive: Boolean
1137 val list: NodeList? // is null only for Empty and JobNode incomplete state objects
1138}
1139
Roman Elizarov6d9f40f2018-04-28 14:44:02 +03001140internal abstract class JobNode<out J : Job>(
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001141 @JvmField val job: J
Roman Elizarovdbd9e1c2018-04-28 15:14:18 +03001142) : CompletionHandlerBase(), DisposableHandle, Incomplete {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001143 override val isActive: Boolean get() = true
1144 override val list: NodeList? get() = null
1145 override fun dispose() = (job as JobSupport).removeNode(this)
1146}
1147
Roman Elizarovede29232018-09-18 12:53:09 +03001148internal class NodeList : LockFreeLinkedListHead(), Incomplete {
1149 override val isActive: Boolean get() = true
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001150 override val list: NodeList get() = this
1151
Roman Elizarovede29232018-09-18 12:53:09 +03001152 fun getString(state: String) = buildString {
1153 append("List{")
1154 append(state)
1155 append("}[")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001156 var first = true
1157 this@NodeList.forEach<JobNode<*>> { node ->
1158 if (first) first = false else append(", ")
1159 append(node)
1160 }
1161 append("]")
1162 }
Roman Elizarovede29232018-09-18 12:53:09 +03001163
1164 override fun toString(): String = getString("Active")
1165}
1166
1167internal class InactiveNodeList(
1168 override val list: NodeList
1169) : Incomplete {
1170 override val isActive: Boolean get() = false
1171 override fun toString(): String = list.getString("New")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001172}
1173
1174private class InvokeOnCompletion(
1175 job: Job,
1176 private val handler: CompletionHandler
1177) : JobNode<Job>(job) {
1178 override fun invoke(cause: Throwable?) = handler.invoke(cause)
1179 override fun toString() = "InvokeOnCompletion[$classSimpleName@$hexAddress]"
1180}
1181
1182private class ResumeOnCompletion(
1183 job: Job,
1184 private val continuation: Continuation<Unit>
1185) : JobNode<Job>(job) {
1186 override fun invoke(cause: Throwable?) = continuation.resume(Unit)
1187 override fun toString() = "ResumeOnCompletion[$continuation]"
1188}
1189
1190internal class DisposeOnCompletion(
1191 job: Job,
1192 private val handle: DisposableHandle
1193) : JobNode<Job>(job) {
1194 override fun invoke(cause: Throwable?) = handle.dispose()
1195 override fun toString(): String = "DisposeOnCompletion[$handle]"
1196}
1197
1198private class SelectJoinOnCompletion<R>(
1199 job: JobSupport,
1200 private val select: SelectInstance<R>,
1201 private val block: suspend () -> R
1202) : JobNode<JobSupport>(job) {
1203 override fun invoke(cause: Throwable?) {
1204 if (select.trySelect(null))
1205 block.startCoroutineCancellable(select.completion)
1206 }
1207 override fun toString(): String = "SelectJoinOnCompletion[$select]"
1208}
1209
1210private class SelectAwaitOnCompletion<T, R>(
1211 job: JobSupport,
1212 private val select: SelectInstance<R>,
1213 private val block: suspend (T) -> R
1214) : JobNode<JobSupport>(job) {
1215 override fun invoke(cause: Throwable?) {
1216 if (select.trySelect(null))
1217 job.selectAwaitCompletion(select, block)
1218 }
1219 override fun toString(): String = "SelectAwaitOnCompletion[$select]"
1220}
1221
1222// -------- invokeOnCancellation nodes
1223
1224/**
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001225 * Marker for node that shall be invoked on in _failing_ state.
1226 * **Note: may be invoked multiple times.**
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001227 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001228internal abstract class JobFailingNode<out J : Job>(job: J) : JobNode<J>(job)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001229
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001230private class InvokeOnFailing(
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001231 job: Job,
1232 private val handler: CompletionHandler
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001233) : JobFailingNode<Job>(job) {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001234 // delegate handler shall be invoked at most once, so here is an additional flag
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001235 private val _invoked = atomic(0) // todo: replace with atomic boolean after migration to recent atomicFu
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001236 override fun invoke(cause: Throwable?) {
1237 if (_invoked.compareAndSet(0, 1)) handler.invoke(cause)
1238 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001239 override fun toString() = "InvokeOnFailing[$classSimpleName@$hexAddress]"
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001240}
1241
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +03001242internal class ChildJob(
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001243 parent: JobSupport,
1244 @JvmField val childJob: Job
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001245) : JobFailingNode<JobSupport>(parent) {
1246 override fun invoke(cause: Throwable?) = childJob.cancelChild(job)
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +03001247 override fun toString(): String = "ChildJob[$childJob]"
1248}
1249
1250// Same as ChildJob, but for cancellable continuation
1251internal class ChildContinuation(
1252 parent: Job,
1253 @JvmField val child: AbstractContinuation<*>
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001254) : JobFailingNode<Job>(parent) {
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +03001255 override fun invoke(cause: Throwable?) {
1256 child.cancel(job.getCancellationException())
1257 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001258 override fun toString(): String =
1259 "ChildContinuation[$child]"
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001260}
1261