blob: 33cc4d3ed6148465809afd08654fa38c0173c101 [file] [log] [blame]
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03001/*
2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03005package kotlinx.coroutines.experimental
6
7import kotlinx.atomicfu.*
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +03008import kotlinx.coroutines.experimental.NotInitialized.*
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03009import kotlinx.coroutines.experimental.internal.*
10import kotlinx.coroutines.experimental.internalAnnotations.*
11import kotlinx.coroutines.experimental.intrinsics.*
12import kotlinx.coroutines.experimental.selects.*
13import kotlin.coroutines.experimental.*
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +030014
15/**
16 * A concrete implementation of [Job]. It is optionally a child to a parent job.
17 * This job is cancelled when the parent is complete, but not vise-versa.
18 *
19 * This is an open class designed for extension by more specific classes that might augment the
20 * state and mare store addition state information for completed jobs, like their result values.
21 *
22 * @param active when `true` the job is created in _active_ state, when `false` in _new_ state. See [Job] for details.
23 * @suppress **This is unstable API and it is subject to change.**
24 */
25internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0 {
26 final override val key: CoroutineContext.Key<*> get() = Job
27
28 /*
29 === Internal states ===
30
31 name state class public state description
32 ------ ------------ ------------ -----------
33 EMPTY_N EmptyNew : New no listeners
34 EMPTY_A EmptyActive : Active no listeners
35 SINGLE JobNode : Active a single listener
36 SINGLE+ JobNode : Active a single listener + NodeList added as its next
37 LIST_N NodeList : New a list of listeners (promoted once, does not got back to EmptyNew)
38 LIST_A NodeList : Active a list of listeners (promoted once, does not got back to JobNode/EmptyActive)
39 COMPLETING Finishing : Completing has a list of listeners (promoted once from LIST_*)
40 CANCELLING Finishing : Cancelling has a list of listeners (promoted once from LIST_*)
41 FINAL_C Cancelled : Cancelled cancelled (final state)
42 FINAL_F Failed : Completed failed for other reason (final state)
43 FINAL_R <any> : Completed produced some result
44
45 === Transitions ===
46
47 New states Active states Inactive states
48
49 +---------+ +---------+ }
50 | EMPTY_N | --+-> | EMPTY_A | ----+ } Empty states
51 +---------+ | +---------+ | }
52 | | | ^ | +----------+
53 | | | | +--> | FINAL_* |
54 | | V | | +----------+
55 | | +---------+ | }
56 | | | SINGLE | ----+ } JobNode states
57 | | +---------+ | }
58 | | | | }
59 | | V | }
60 | | +---------+ | }
61 | +-- | SINGLE+ | ----+ }
62 | +---------+ | }
63 | | |
64 V V |
65 +---------+ +---------+ | }
66 | LIST_N | ----> | LIST_A | ----+ } NodeList states
67 +---------+ +---------+ | }
68 | | | | |
69 | | +--------+ | |
70 | | | V |
71 | | | +------------+ | +------------+ }
72 | +-------> | COMPLETING | --+-- | CANCELLING | } Finishing states
73 | | +------------+ +------------+ }
74 | | | ^
75 | | | |
76 +--------+---------+--------------------+
77
78
79 This state machine and its transition matrix are optimized for the common case when job is created in active
80 state (EMPTY_A) and at most one completion listener is added to it during its life-time.
81
82 Note, that the actual `_state` variable can also be a reference to atomic operation descriptor `OpDescriptor`
83 */
84
85 // Note: use shared objects while we have no listeners
86 private val _state = atomic<Any?>(if (active) EmptyActive else EmptyNew)
87
88 @Volatile
89 private var parentHandle: DisposableHandle? = null
90
91 // ------------ initialization ------------
92
93 /**
94 * Initializes parent job.
95 * It shall be invoked at most once after construction after all other initialization.
96 * @suppress **This is unstable API and it is subject to change.**
97 */
98 internal fun initParentJobInternal(parent: Job?) {
99 check(parentHandle == null)
100 if (parent == null) {
101 parentHandle = NonDisposableHandle
102 return
103 }
104 parent.start() // make sure the parent is started
105 @Suppress("DEPRECATION")
106 val handle = parent.attachChild(this)
107 parentHandle = handle
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300108 // now check our state _after_ registering (see tryFinalizeState order of actions)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300109 if (isCompleted) {
110 handle.dispose()
111 parentHandle = NonDisposableHandle // release it just in case, to aid GC
112 }
113 }
114
115 // ------------ state query ------------
116
117 /**
118 * Returns current state of this job.
119 * @suppress **This is unstable API and it is subject to change.**
120 */
121 internal val state: Any? get() {
122 _state.loop { state -> // helper loop on state (complete in-progress atomic operations)
123 if (state !is OpDescriptor) return state
124 state.perform(this)
125 }
126 }
127
128 /**
129 * @suppress **This is unstable API and it is subject to change.**
130 */
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300131 private inline fun loopOnState(block: (Any?) -> Unit): Nothing {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300132 while (true) {
133 block(state)
134 }
135 }
136
137 public final override val isActive: Boolean get() {
138 val state = this.state
139 return state is Incomplete && state.isActive
140 }
141
142 public final override val isCompleted: Boolean get() = state !is Incomplete
143
144 public final override val isCancelled: Boolean get() {
145 val state = this.state
146 return state is Cancelled || (state is Finishing && state.cancelled != null)
147 }
148
149 // ------------ state update ------------
150
151 /**
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300152 * Updates current [state] of this job to the final state, invoking all necessary handlers
153 * and/or `on*` methods.
154 *
155 * Returns `false` if current state is not equal to expected.
156 * If this method succeeds, state of this job will never be changed again
157 */
158 private fun tryFinalizeState(expect: Incomplete, proposedUpdate: Any?, mode: Int): Boolean {
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +0300159 if (expect is Finishing && expect.cancelled != null) {
160 return tryFinalizeCancellingState(expect, proposedUpdate, mode)
161 }
162
163 val update = coerceProposedUpdate(expect, proposedUpdate)
164 if (!tryFinalizeState(expect, update)) return false
165 if (update is CompletedExceptionally) handleJobException(update.cause)
166 completeStateFinalization(expect, update, mode)
167 return true
168 }
169
170 private fun tryFinalizeCancellingState(expect: Finishing, proposedUpdate: Any?, mode: Int): Boolean {
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300171 /*
172 * If job is in 'cancelling' state and we're finalizing job state, we start intricate dance:
173 * 1) Synchronize on state to avoid races with concurrent
174 * mutations (e.g. when new child is added)
175 * 2) After synchronization check we're still in the expected state
176 * 3) Aggregate final exception under the same lock which protects exceptions
177 * collection
178 * 4) Pass it upstream
179 */
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +0300180 val finalException = synchronized(expect) {
181 if (_state.value !== expect) {
182 return false
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300183 }
184
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +0300185 if (proposedUpdate is CompletedExceptionally) {
186 expect.addLocked(proposedUpdate.cause)
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300187 }
188
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +0300189 /*
190 * Note that new exceptions cannot be added concurrently: state is guarded by lock
191 * and storage is sealed in the end, so all new exceptions will be reported separately
192 */
193 buildException(expect).also { expect.seal() }
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300194 }
195
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +0300196 val update = Cancelled(this, finalException ?: expect.cancelled!!.cause)
197 handleJobException(update.cause)
198 // This CAS never fails: we're in the state when no jobs can be attached, because state is already sealed
199 if (!tryFinalizeState(expect, update)) {
200 val error = AssertionError("Unexpected state: ${_state.value}, expected: $expect, update: $update")
201 handleOnCompletionException(error)
202 throw error
203 }
204
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300205 completeStateFinalization(expect, update, mode)
206 return true
207 }
208
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300209 private fun buildException(state: Finishing): Throwable? {
210 val cancelled = state.cancelled!!
211 val suppressed = state.exceptions
212
213 /*
214 * This is a place where we step on our API limitation:
215 * We can't distinguish internal JobCancellationException from our parent
216 * from external cancellation, thus we ought to collect all exceptions.
217 *
218 * But it has negative consequences: same exception can be added as suppressed more than once.
219 * Consider concurrent parent-child relationship:
220 * 1) Child throws E1 and parent throws E2
221 * 2) Parent goes to "Cancelling(E1)" and cancels child with E1
222 * 3) Child goes to "Cancelling(E1)", but throws an exception E2
223 * 4) When child throws, it notifies parent that he is cancelling, adding its exception to parent list of exceptions
224 * (again, parent don't know whether it's child exception or external exception)
225 * 5) Child builds final exception: E1 with suppressed E2, reports it to parent
226 * 6) Parent aggregates three exceptions: original E1, reported E2 and "final" E1.
227 * It filters the third exception, but adds the second one to the first one, thus adding suppressed duplicate.
228 *
229 * Note that it's only happening when both parent and child throw exception simultaneously
230 */
231 var rootCause = cancelled.cause
232 if (rootCause is JobCancellationException) {
233 val cause = unwrap(rootCause)
234 rootCause = if (cause !== null) {
235 cause
236 } else {
237 suppressed.firstOrNull { unwrap(it) != null } ?: return rootCause
238 }
239 }
240
241 val seenExceptions = HashSet<Throwable>() // TODO it should be identity set
242 suppressed.forEach {
243 val unwrapped = unwrap(it)
244 if (unwrapped !== null && unwrapped !== rootCause) {
245 if (seenExceptions.add(unwrapped)) {
246 rootCause.addSuppressedThrowable(unwrapped)
247 }
248 }
249 }
250
251 return rootCause
252 }
253
254 private tailrec fun unwrap(exception: Throwable): Throwable? {
255 if (exception is JobCancellationException) {
256 val cause = exception.cause
257 if (cause !== null) return unwrap(cause)
258 return null
259
260 } else {
261 return exception
262 }
263 }
264
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300265 /**
266 * Tries to update [_state] of this job to the final state and, if
267 * succeeds, disposes parent handle (de-attaching child from parent)
268 */
269 private fun tryFinalizeState(expect: Incomplete, update: Any?): Boolean {
270 require(update !is Incomplete) // only incomplete -> completed transition is allowed
271 if (!_state.compareAndSet(expect, update)) return false
272 // Unregister from parent job
273 parentHandle?.let {
274 it.dispose() // volatile read parentHandle _after_ state was updated
275 parentHandle = NonDisposableHandle // release it just in case, to aid GC
276 }
277 return true // continues in completeStateFinalization
278 }
279
280 /**
281 * Completes update of the current [state] of this job.
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300282 * @suppress **This is unstable API and it is subject to change.**
283 */
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300284 private fun completeStateFinalization(expect: Incomplete, update: Any?, mode: Int) {
285 val exceptionally = update as? CompletedExceptionally
286 // Do overridable processing before completion handlers
287
288 /*
289 * 1) Invoke onCancellationInternal: exception handling, parent/resource cancellation etc.
290 * 2) Invoke completion handlers: .join(), callbacks etc. It's important to invoke them only AFTER exception handling, see #208
291 * 3) Invoke onCompletionInternal: onNext(), timeout deregistration etc. I should be last so all callbacks observe consistent state
292 * of the job which doesn't depend on callback scheduling
293 *
294 * Only notify on cancellation once (expect.isCancelling)
295 */
296 if (!expect.isCancelling) onCancellationInternal(exceptionally)
297
298 // Invoke completion handlers
299 val cause = exceptionally?.cause
300 if (expect is JobNode<*>) { // SINGLE/SINGLE+ state -- one completion handler (common case)
301 try {
302 expect.invoke(cause)
303 } catch (ex: Throwable) {
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300304 handleOnCompletionException(CompletionHandlerException("Exception in completion handler $expect for $this", ex))
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300305 }
306 } else {
307 expect.list?.notifyCompletion(cause)
308 }
309 onCompletionInternal(update, mode)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300310 }
311
312 // when Job is in Cancelling state, it can only be promoted to Cancelled state,
313 // so if the proposed Update is not an appropriate Cancelled (preserving the cancellation cause),
314 // then the corresponding Cancelled state is constructed.
315 private fun coerceProposedUpdate(expect: Incomplete, proposedUpdate: Any?): Any? =
316 if (expect is Finishing && expect.cancelled != null && !isCorrespondinglyCancelled(expect.cancelled, proposedUpdate))
317 createCancelled(expect.cancelled, proposedUpdate) else proposedUpdate
318
319 private fun isCorrespondinglyCancelled(cancelled: Cancelled, proposedUpdate: Any?): Boolean {
320 if (proposedUpdate !is Cancelled) return false
321 // NOTE: equality comparison of causes is performed here by design, see equals of JobCancellationException
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300322 return proposedUpdate.cause == cancelled.cause || proposedUpdate.cause is JobCancellationException
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300323 }
324
325 private fun createCancelled(cancelled: Cancelled, proposedUpdate: Any?): Cancelled {
326 if (proposedUpdate !is CompletedExceptionally) return cancelled // not exception -- just use original cancelled
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300327 val exception = proposedUpdate.cause
328 if (cancelled.cause == exception) return cancelled // that is the cancelled we need already!
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300329 // That could have occurred while coroutine is being cancelled.
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300330 // Do not spam with JCE in suppressed exceptions
331 if (cancelled.cause !is JobCancellationException) {
332 exception.addSuppressedThrowable(cancelled.cause)
333 }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300334 return Cancelled(this, exception)
335 }
336
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300337 private fun NodeList.notifyCompletion(cause: Throwable?) =
338 notifyHandlers<JobNode<*>>(this, cause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300339
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300340 private fun notifyCancellation(list: NodeList, cause: Throwable?) =
341 notifyHandlers<JobCancellationNode<*>>(list, cause)
Vsevolod Tolstopyatovb10287e2018-07-03 11:11:21 +0300342
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300343 private inline fun <reified T: JobNode<*>> notifyHandlers(list: NodeList, cause: Throwable?) {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300344 var exception: Throwable? = null
345 list.forEach<T> { node ->
346 try {
347 node.invoke(cause)
348 } catch (ex: Throwable) {
349 exception?.apply { addSuppressedThrowable(ex) } ?: run {
350 exception = CompletionHandlerException("Exception in completion handler $node for $this", ex)
351 }
352 }
353 }
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300354 exception?.let { handleOnCompletionException(it) }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300355 }
356
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300357 public final override fun start(): Boolean {
358 loopOnState { state ->
359 when (startInternal(state)) {
360 FALSE -> return false
361 TRUE -> return true
362 }
363 }
364 }
365
366 // returns: RETRY/FALSE/TRUE:
367 // FALSE when not new,
368 // TRUE when started
369 // RETRY when need to retry
370 private fun startInternal(state: Any?): Int {
371 when (state) {
372 is Empty -> { // EMPTY_X state -- no completion handlers
373 if (state.isActive) return FALSE // already active
374 if (!_state.compareAndSet(state, EmptyActive)) return RETRY
375 onStartInternal()
376 return TRUE
377 }
378 is NodeList -> { // LIST -- a list of completion handlers (either new or active)
379 return state.tryMakeActive().also { result ->
380 if (result == TRUE) onStartInternal()
381 }
382 }
383 else -> return FALSE // not a new state
384 }
385 }
386
387 /**
388 * Override to provide the actual [start] action.
389 * This function is invoked exactly once when non-active coroutine is [started][start].
390 */
391 internal open fun onStartInternal() {}
392
393 public final override fun getCancellationException(): CancellationException {
394 val state = this.state
395 return when {
396 state is Finishing && state.cancelled != null ->
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300397 state.cancelled.cause.toCancellationException("Job is being cancelled")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300398 state is Incomplete ->
399 error("Job was not completed or cancelled yet: $this")
400 state is CompletedExceptionally ->
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300401 state.cause.toCancellationException("Job has failed")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300402 else -> JobCancellationException("Job has completed normally", null, this)
403 }
404 }
405
406 private fun Throwable.toCancellationException(message: String): CancellationException =
407 this as? CancellationException ?: JobCancellationException(message, this, this@JobSupport)
408
409 /**
410 * Returns the cause that signals the completion of this job -- it returns the original
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300411 * [cancel] cause, [JobCancellationException] or **`null` if this job had completed normally**.
412 * This function throws [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300413 * [isCancelled] yet.
414 */
415 protected fun getCompletionCause(): Throwable? {
416 val state = this.state
417 return when {
418 state is Finishing && state.cancelled != null -> state.cancelled.cause
419 state is Incomplete -> error("Job was not completed or cancelled yet")
420 state is CompletedExceptionally -> state.cause
421 else -> null
422 }
423 }
424
425 @Suppress("OverridingDeprecatedMember")
426 public final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle =
427 invokeOnCompletion(onCancelling = false, invokeImmediately = true, handler = handler)
428
429 @Suppress("OverridingDeprecatedMember")
430 public final override fun invokeOnCompletion(handler: CompletionHandler, onCancelling: Boolean): DisposableHandle =
431 invokeOnCompletion(onCancelling = onCancelling, invokeImmediately = true, handler = handler)
432
433 @Suppress("OverridingDeprecatedMember")
434 public final override fun invokeOnCompletion(onCancelling_: Boolean, handler: CompletionHandler): DisposableHandle =
435 invokeOnCompletion(onCancelling = onCancelling_, invokeImmediately = true, handler = handler)
436
437 // todo: non-final as a workaround for KT-21968, should be final in the future
438 public override fun invokeOnCompletion(
439 onCancelling: Boolean,
440 invokeImmediately: Boolean,
441 handler: CompletionHandler
442 ): DisposableHandle {
443 var nodeCache: JobNode<*>? = null
444 loopOnState { state ->
445 when (state) {
446 is Empty -> { // EMPTY_X state -- no completion handlers
447 if (state.isActive) {
448 // try move to SINGLE state
449 val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
450 if (_state.compareAndSet(state, node)) return node
451 } else
452 promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
453 }
454 is Incomplete -> {
455 val list = state.list
456 if (list == null) { // SINGLE/SINGLE+
457 promoteSingleToNodeList(state as JobNode<*>)
458 } else {
459 if (state is Finishing && state.cancelled != null && onCancelling) {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300460 // installing cancellation handler on job that is being cancelled
461 if (invokeImmediately) handler(state.cancelled.cause)
462 return NonDisposableHandle
463 }
464 val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
465 if (addLastAtomic(state, list, node)) return node
466 }
467 }
468 else -> { // is complete
469 // :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
470 // because we play type tricks on Kotlin/JS and handler is not necessarily a function there
471 if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause)
472 return NonDisposableHandle
473 }
474 }
475 }
476 }
477
478 private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode<*> {
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300479 return if (onCancelling)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300480 (handler as? JobCancellationNode<*>)?.also { require(it.job === this) }
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300481 ?: InvokeOnCancellation(this, handler)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300482 else
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300483 (handler as? JobNode<*>)?.also { require(it.job === this && it !is JobCancellationNode) }
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300484 ?: InvokeOnCompletion(this, handler)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300485 }
486
487 private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode<*>) =
488 list.addLastIf(node) { this.state === expect }
489
490 private fun promoteEmptyToNodeList(state: Empty) {
491 // try to promote it to list in new state
492 _state.compareAndSet(state, NodeList(state.isActive))
493 }
494
495 private fun promoteSingleToNodeList(state: JobNode<*>) {
496 // try to promote it to list (SINGLE+ state)
497 state.addOneIfEmpty(NodeList(active = true))
498 // it must be in SINGLE+ state or state has changed (node could have need removed from state)
499 val list = state.nextNode // either our NodeList or somebody else won the race, updated state
500 // just attempt converting it to list if state is still the same, then we'll continue lock-free loop
501 _state.compareAndSet(state, list)
502 }
503
504 public final override suspend fun join() {
505 if (!joinInternal()) { // fast-path no wait
Roman Elizarov222f3f22018-07-13 18:47:17 +0300506 coroutineContext.checkCompletion()
507 return // do not suspend
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300508 }
509 return joinSuspend() // slow-path wait
510 }
511
512 private fun joinInternal(): Boolean {
513 loopOnState { state ->
514 if (state !is Incomplete) return false // not active anymore (complete) -- no need to wait
515 if (startInternal(state) >= 0) return true // wait unless need to retry
516 }
517 }
518
519 private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300520 // We have to invoke join() handler only on cancellation, on completion we will be resumed regularly without handlers
521 cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(this, cont).asHandler))
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300522 }
523
524 public final override val onJoin: SelectClause0
525 get() = this
526
527 // registerSelectJoin
528 public final override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) {
529 // fast-path -- check state and select/return if needed
530 loopOnState { state ->
531 if (select.isSelected) return
532 if (state !is Incomplete) {
533 // already complete -- select result
534 if (select.trySelect(null)) {
535 select.completion.context.checkCompletion() // always check for our completion
536 block.startCoroutineUndispatched(select.completion)
537 }
538 return
539 }
540 if (startInternal(state) == 0) {
541 // slow-path -- register waiter for completion
542 select.disposeOnSelect(invokeOnCompletion(handler = SelectJoinOnCompletion(this, select, block).asHandler))
543 return
544 }
545 }
546 }
547
548 /**
549 * @suppress **This is unstable API and it is subject to change.**
550 */
551 internal fun removeNode(node: JobNode<*>) {
552 // remove logic depends on the state of the job
553 loopOnState { state ->
554 when (state) {
555 is JobNode<*> -> { // SINGE/SINGLE+ state -- one completion handler
556 if (state !== node) return // a different job node --> we were already removed
557 // try remove and revert back to empty state
558 if (_state.compareAndSet(state, EmptyActive)) return
559 }
560 is Incomplete -> { // may have a list of completion handlers
561 // remove node from the list if there is a list
562 if (state.list != null) node.remove()
563 return
564 }
565 else -> return // it is complete and does not have any completion handlers
566 }
567 }
568 }
569
570 /**
571 * @suppress **This is unstable API and it is subject to change.**
572 */
573 internal open val onCancelMode: Int get() = ON_CANCEL_MAKE_CANCELLING
574
575 public override fun cancel(cause: Throwable?): Boolean = when (onCancelMode) {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300576 ON_CANCEL_MAKE_CANCELLING -> makeCancelling(cause)
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300577 ON_CANCEL_MAKE_COMPLETING -> makeCompleting(Cancelled(this, cause))
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300578 else -> error("Invalid onCancelMode $onCancelMode")
579 }
580
581 // we will be dispatching coroutine to process its cancellation exception, so there is no need for
582 // an extra check for Job status in MODE_CANCELLABLE
583 private fun updateStateCancelled(state: Incomplete, cause: Throwable?) =
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300584 tryFinalizeState(state, Cancelled(this, cause), mode = MODE_ATOMIC_DEFAULT)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300585
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300586 // transitions to Cancelling state
587 private fun makeCancelling(cause: Throwable?): Boolean {
588 loopOnState { state ->
589 when (state) {
590 is Empty -> { // EMPTY_X state -- no completion handlers
591 if (state.isActive) {
592 promoteEmptyToNodeList(state) // this way can wrap it into Cancelling on next pass
593 } else {
594 // cancelling a non-started coroutine makes it immediately cancelled
595 // (and we have no listeners to notify which makes it very simple)
596 if (updateStateCancelled(state, cause)) return true
597 }
598 }
599 is JobNode<*> -> { // SINGLE/SINGLE+ state -- one completion handler
600 promoteSingleToNodeList(state)
601 }
602 is NodeList -> { // LIST -- a list of completion handlers (either new or active)
603 if (state.isActive) {
604 if (tryMakeCancelling(state, state.list, cause)) return true
605 } else {
606 // cancelling a non-started coroutine makes it immediately cancelled
607 if (updateStateCancelled(state, cause))
608 return true
609 }
610 }
611 is Finishing -> { // Completing/Cancelling the job, may cancel
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300612 if (state.cancelled != null) {
613 if (cause == null) {
614 return true
615 }
616
617 // We either successfully added an exception or caller should handle it itself
618 return cause.let { state.addException(it) }
619 }
620
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300621 if (tryMakeCancelling(state, state.list, cause)) return true
622 }
623 else -> { // is inactive
624 return false
625 }
626 }
627 }
628 }
629
630 // try make expected state in cancelling on the condition that we're still in this state
631 private fun tryMakeCancelling(expect: Incomplete, list: NodeList, cause: Throwable?): Boolean {
632 val cancelled = Cancelled(this, cause)
633 if (!_state.compareAndSet(expect, Finishing(list, cancelled, false))) return false
634 onFinishingInternal(cancelled)
635 onCancellationInternal(cancelled)
Vsevolod Tolstopyatov6d24aab2018-07-20 14:38:17 +0300636 // Materialize cause
637 notifyCancellation(list, cancelled.cause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300638 return true
639 }
640
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300641 /**
642 * @suppress **This is unstable API and it is subject to change.**
643 */
644 internal fun makeCompleting(proposedUpdate: Any?): Boolean =
645 when (makeCompletingInternal(proposedUpdate, mode = MODE_ATOMIC_DEFAULT)) {
646 COMPLETING_ALREADY_COMPLETING -> false
647 else -> true
648 }
649
650 /**
651 * Returns:
652 * * `true` if state was updated to completed/cancelled;
653 * * `false` if made completing or it is cancelling and is waiting for children.
654 *
655 * @throws IllegalStateException if job is already complete or completing
656 * @suppress **This is unstable API and it is subject to change.**
657 */
658 internal fun makeCompletingOnce(proposedUpdate: Any?, mode: Int): Boolean =
659 when (makeCompletingInternal(proposedUpdate, mode)) {
660 COMPLETING_COMPLETED -> true
661 COMPLETING_WAITING_CHILDREN -> false
662 else -> throw IllegalStateException("Job $this is already complete or completing, " +
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300663 "but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300664 }
665
666 private fun makeCompletingInternal(proposedUpdate: Any?, mode: Int): Int {
667 loopOnState { state ->
668 if (state !is Incomplete)
669 return COMPLETING_ALREADY_COMPLETING
670 if (state is Finishing && state.completing)
671 return COMPLETING_ALREADY_COMPLETING
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300672 val child: ChildJob? = firstChild(state) ?: // or else complete immediately w/o children
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300673 when {
674 state !is Finishing && hasOnFinishingHandler(proposedUpdate) -> null // unless it has onFinishing handler
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300675 tryFinalizeState(state, proposedUpdate, mode) -> return COMPLETING_COMPLETED
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300676 else -> return@loopOnState
677 }
678 val list = state.list ?: // must promote to list to correctly operate on child lists
679 when (state) {
680 is Empty -> {
681 promoteEmptyToNodeList(state)
682 return@loopOnState // retry
683 }
684 is JobNode<*> -> {
685 promoteSingleToNodeList(state)
686 return@loopOnState // retry
687 }
688 else -> error("Unexpected state with an empty list: $state")
689 }
690 // cancel all children in list on exceptional completion
691 if (proposedUpdate is CompletedExceptionally)
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300692 child?.cancelChildrenInternal(proposedUpdate.cause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300693 // switch to completing state
694 val cancelled = (state as? Finishing)?.cancelled ?: (proposedUpdate as? Cancelled)
695 val completing = Finishing(list, cancelled, true)
696 if (_state.compareAndSet(state, completing)) {
697 if (state !is Finishing) onFinishingInternal(proposedUpdate)
698 if (child != null && tryWaitForChild(child, proposedUpdate))
699 return COMPLETING_WAITING_CHILDREN
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300700 if (tryFinalizeState(completing, proposedUpdate, mode = MODE_ATOMIC_DEFAULT))
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300701 return COMPLETING_COMPLETED
702 }
703 }
704 }
705
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300706 private tailrec fun ChildJob.cancelChildrenInternal(cause: Throwable) {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300707 childJob.cancel(JobCancellationException("Child job was cancelled because of parent failure", cause, childJob))
708 nextChild()?.cancelChildrenInternal(cause)
709 }
710
711 private val Any?.exceptionOrNull: Throwable?
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300712 get() = (this as? CompletedExceptionally)?.cause
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300713
714 private fun firstChild(state: Incomplete) =
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300715 state as? ChildJob ?: state.list?.nextChild()
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300716
717 // return false when there is no more incomplete children to wait
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300718 private tailrec fun tryWaitForChild(child: ChildJob, proposedUpdate: Any?): Boolean {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300719 val handle = child.childJob.invokeOnCompletion(invokeImmediately = false,
720 handler = ChildCompletion(this, child, proposedUpdate).asHandler)
721 if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
722 val nextChild = child.nextChild() ?: return false
723 return tryWaitForChild(nextChild, proposedUpdate)
724 }
725
726 /**
727 * @suppress **This is unstable API and it is subject to change.**
728 */
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300729 internal fun continueCompleting(lastChild: ChildJob, proposedUpdate: Any?) {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300730 loopOnState { state ->
731 if (state !is Finishing)
732 throw IllegalStateException("Job $this is found in expected state while completing with $proposedUpdate", proposedUpdate.exceptionOrNull)
733 // figure out if we need to wait for next child
734 val waitChild = lastChild.nextChild()
735 // try wait for next child
736 if (waitChild != null && tryWaitForChild(waitChild, proposedUpdate)) return // waiting for next child
737 // no more children to wait -- try update state
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300738 if (tryFinalizeState(state, proposedUpdate, MODE_ATOMIC_DEFAULT)) return
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300739 }
740 }
741
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300742 private fun LockFreeLinkedListNode.nextChild(): ChildJob? {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300743 var cur = this
744 while (cur.isRemoved) cur = cur.prevNode // rollback to prev non-removed (or list head)
745 while (true) {
746 cur = cur.nextNode
747 if (cur.isRemoved) continue
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300748 if (cur is ChildJob) return cur
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300749 if (cur is NodeList) return null // checked all -- no more children
750 }
751 }
752
753 public final override val children: Sequence<Job> get() = buildSequence {
754 val state = this@JobSupport.state
755 when (state) {
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300756 is ChildJob -> yield(state.childJob)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300757 is Incomplete -> state.list?.let { list ->
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300758 list.forEach<ChildJob> { yield(it.childJob) }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300759 }
760 }
761 }
762
763 @Suppress("OverridingDeprecatedMember")
764 public final override fun attachChild(child: Job): DisposableHandle =
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300765 invokeOnCompletion(onCancelling = true, handler = ChildJob(this, child).asHandler)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300766
767 @Suppress("OverridingDeprecatedMember")
768 public final override fun cancelChildren(cause: Throwable?) {
769 this.cancelChildren(cause) // use extension function
770 }
771
772 /**
773 * Override to process any exceptions that were encountered while invoking completion handlers
774 * installed via [invokeOnCompletion].
775 * @suppress **This is unstable API and it is subject to change.**
776 */
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300777 internal open fun handleOnCompletionException(exception: Throwable) {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300778 throw exception
779 }
780
781 /**
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300782 * This function is invoked once when job is cancelled or is completed.
783 * It's an optimization for [invokeOnCompletion] with `onCancelling` set to `true`.
784 *
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300785 * @param exceptionally not null when the the job was cancelled or completed exceptionally,
786 * null when it has completed normally.
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300787 * @suppress **This is unstable API and it is subject to change.*
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300788 */
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300789 internal open fun onCancellationInternal(exceptionally: CompletedExceptionally?) {
790 // TODO rename to "onCancelling"
791 }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300792
793 /**
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300794 * Whether job has [onFinishingInternal] handler for given [update]
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300795 * @suppress **This is unstable API and it is subject to change.**
796 */
797 internal open fun hasOnFinishingHandler(update: Any?) = false
798
799 /**
800 * @suppress **This is unstable API and it is subject to change.**
801 */
802 internal open fun onFinishingInternal(update: Any?) {}
803
804 /**
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300805 * Method which is invoked once Job becomes `Cancelled`. It's guaranteed that at the moment
806 * of invocation the job and all its children are complete
807 */
808 internal open fun handleJobException(exception: Throwable) {}
809
810 /**
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300811 * Override for post-completion actions that need to do something with the state.
812 * @param mode completion mode.
813 * @suppress **This is unstable API and it is subject to change.**
814 */
815 internal open fun onCompletionInternal(state: Any?, mode: Int) {}
816
817 // for nicer debugging
818 public override fun toString(): String =
819 "${nameString()}{${stateString()}}@$hexAddress"
820
821 /**
822 * @suppress **This is unstable API and it is subject to change.**
823 */
824 internal open fun nameString(): String = classSimpleName
825
826 private fun stateString(): String {
827 val state = this.state
828 return when (state) {
829 is Finishing -> buildString {
830 if (state.cancelled != null) append("Cancelling")
831 if (state.completing) append("Completing")
832 }
833 is Incomplete -> if (state.isActive) "Active" else "New"
834 is Cancelled -> "Cancelled"
835 is CompletedExceptionally -> "CompletedExceptionally"
836 else -> "Completed"
837 }
838 }
839
840 // Cancelling or Completing
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +0300841 @Suppress("UNCHECKED_CAST")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300842 private class Finishing(
843 override val list: NodeList,
844 @JvmField val cancelled: Cancelled?, /* != null when cancelling */
845 @JvmField val completing: Boolean /* true when completing */
846 ) : Incomplete {
847 override val isActive: Boolean get() = cancelled == null
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300848
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +0300849 val exceptions: List<Throwable> get() = when(_exceptionsHolder) {
850 NOT_INITIALIZED -> emptyList()
851 is Throwable -> listOf(_exceptionsHolder as Throwable) // EA should handle this
852 else -> (_exceptionsHolder as List<Throwable>)
853 }
854
855 private var _exceptionsHolder: Any? = if (cancelled == null) null else NOT_INITIALIZED
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300856
857 fun addException(exception: Throwable): Boolean {
858 synchronized(this) {
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +0300859 return when (_exceptionsHolder) {
860 null -> false
861 NOT_INITIALIZED -> {
862 _exceptionsHolder = exception
863 return true
864 }
865 is Throwable -> {
866 val previous = _exceptionsHolder
867 val list = ArrayList<Any?>(4)
868 list.add(previous)
869 list.add(exception)
870 _exceptionsHolder = list
871 return true
872 }
873 else -> (_exceptionsHolder as MutableList<Throwable>).add(exception)
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300874 }
875 }
876 }
877
878 fun addLocked(exception: Throwable) {
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +0300879 // Cannot be null at this point here
880 when (_exceptionsHolder) {
881 NOT_INITIALIZED -> {
882 _exceptionsHolder = exception
883 }
884 is Throwable -> {
885 val previous = _exceptionsHolder
886 val list = ArrayList<Any?>(4)
887 list.add(previous)
888 list.add(exception)
889 _exceptionsHolder = list
890 }
891 else -> (_exceptionsHolder as MutableList<Throwable>).add(exception)
892 }
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300893 }
894
895 /**
896 * Seals current state. After [seal] call all consecutive calls to [addException]
897 * return `false` forcing callers to handle pending exception by themselves.
898 * This call should be guarded by `synchronized(finishingState)`
899 */
900 fun seal() {
901 _exceptionsHolder = null
902 }
903
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300904 }
905
906 private val Incomplete.isCancelling: Boolean
907 get() = this is Finishing && cancelled != null
908
909 /*
910 * =================================================================================================
911 * This is ready-to-use implementation for Deferred interface.
912 * However, it is not type-safe. Conceptually it just exposes the value of the underlying
913 * completed state as `Any?`
914 * =================================================================================================
915 */
916
917 public val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally
918
919 public fun getCompletionExceptionOrNull(): Throwable? {
920 val state = this.state
921 check(state !is Incomplete) { "This job has not completed yet" }
922 return state.exceptionOrNull
923 }
924
925 /**
926 * @suppress **This is unstable API and it is subject to change.**
927 */
928 internal fun getCompletedInternal(): Any? {
929 val state = this.state
930 check(state !is Incomplete) { "This job has not completed yet" }
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300931 if (state is CompletedExceptionally) throw state.cause
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300932 return state
933 }
934
935 /**
936 * @suppress **This is unstable API and it is subject to change.**
937 */
938 internal suspend fun awaitInternal(): Any? {
939 // fast-path -- check state (avoid extra object creation)
940 while(true) { // lock-free loop on state
941 val state = this.state
942 if (state !is Incomplete) {
943 // already complete -- just return result
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300944 if (state is CompletedExceptionally) throw state.cause
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300945 return state
946
947 }
948 if (startInternal(state) >= 0) break // break unless needs to retry
949 }
950 return awaitSuspend() // slow-path
951 }
952
953 private suspend fun awaitSuspend(): Any? = suspendCancellableCoroutine { cont ->
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300954 // We have to invoke await() handler only on cancellation, on completion we will be resumed regularly without handlers
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300955 cont.disposeOnCancellation(invokeOnCompletion {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300956 val state = this.state
957 check(state !is Incomplete)
958 if (state is CompletedExceptionally)
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300959 cont.resumeWithException(state.cause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300960 else
961 cont.resume(state)
962 })
963 }
964
965 /**
966 * @suppress **This is unstable API and it is subject to change.**
967 */
968 // registerSelectAwaitInternal
969 @Suppress("UNCHECKED_CAST")
970 internal fun <T, R> registerSelectClause1Internal(select: SelectInstance<R>, block: suspend (T) -> R) {
971 // fast-path -- check state and select/return if needed
972 loopOnState { state ->
973 if (select.isSelected) return
974 if (state !is Incomplete) {
975 // already complete -- select result
976 if (select.trySelect(null)) {
977 if (state is CompletedExceptionally)
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300978 select.resumeSelectCancellableWithException(state.cause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300979 else
980 block.startCoroutineUndispatched(state as T, select.completion)
981 }
982 return
983 }
984 if (startInternal(state) == 0) {
985 // slow-path -- register waiter for completion
986 select.disposeOnSelect(invokeOnCompletion(handler = SelectAwaitOnCompletion(this, select, block).asHandler))
987 return
988 }
989 }
990 }
991
992 /**
993 * @suppress **This is unstable API and it is subject to change.**
994 */
995 @Suppress("UNCHECKED_CAST")
996 internal fun <T, R> selectAwaitCompletion(select: SelectInstance<R>, block: suspend (T) -> R) {
997 val state = this.state
998 // Note: await is non-atomic (can be cancelled while dispatched)
999 if (state is CompletedExceptionally)
Roman Elizarov6d9f40f2018-04-28 14:44:02 +03001000 select.resumeSelectCancellableWithException(state.cause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001001 else
1002 block.startCoroutineCancellable(state as T, select.completion)
1003 }
1004}
1005
1006// --------------- helper classes to simplify job implementation
1007
Roman Elizarov6d9f40f2018-04-28 14:44:02 +03001008internal const val ON_CANCEL_MAKE_CANCELLING = 0
1009internal const val ON_CANCEL_MAKE_COMPLETING = 1
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001010
1011private const val COMPLETING_ALREADY_COMPLETING = 0
1012private const val COMPLETING_COMPLETED = 1
1013private const val COMPLETING_WAITING_CHILDREN = 2
1014
1015private const val RETRY = -1
1016private const val FALSE = 0
1017private const val TRUE = 1
1018
1019@Suppress("PrivatePropertyName")
1020private val EmptyNew = Empty(false)
1021@Suppress("PrivatePropertyName")
1022private val EmptyActive = Empty(true)
1023
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +03001024private enum class NotInitialized {
1025 NOT_INITIALIZED
1026}
1027
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001028private class Empty(override val isActive: Boolean) : Incomplete {
1029 override val list: NodeList? get() = null
1030 override fun toString(): String = "Empty{${if (isActive) "Active" else "New" }}"
1031}
1032
1033internal class JobImpl(parent: Job? = null) : JobSupport(true) {
1034 init { initParentJobInternal(parent) }
1035 override val onCancelMode: Int get() = ON_CANCEL_MAKE_COMPLETING
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +03001036
1037 override fun cancel(cause: Throwable?): Boolean {
1038 // JobImpl can't handle an exception, thus always returns false
1039 super.cancel(cause)
1040 return false
1041 }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001042}
1043
1044// -------- invokeOnCompletion nodes
1045
1046internal interface Incomplete {
1047 val isActive: Boolean
1048 val list: NodeList? // is null only for Empty and JobNode incomplete state objects
1049}
1050
Roman Elizarov6d9f40f2018-04-28 14:44:02 +03001051internal abstract class JobNode<out J : Job>(
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001052 @JvmField val job: J
Roman Elizarovdbd9e1c2018-04-28 15:14:18 +03001053) : CompletionHandlerBase(), DisposableHandle, Incomplete {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001054 override val isActive: Boolean get() = true
1055 override val list: NodeList? get() = null
1056 override fun dispose() = (job as JobSupport).removeNode(this)
1057}
1058
1059internal class NodeList(
1060 active: Boolean
1061) : LockFreeLinkedListHead(), Incomplete {
1062 private val _active = atomic(if (active) 1 else 0)
1063
1064 override val isActive: Boolean get() = _active.value != 0
1065 override val list: NodeList get() = this
1066
1067 fun tryMakeActive(): Int {
1068 if (_active.value != 0) return FALSE
1069 if (_active.compareAndSet(0, 1)) return TRUE
1070 return RETRY
1071 }
1072
1073 override fun toString(): String = buildString {
1074 append("List")
1075 append(if (isActive) "{Active}" else "{New}")
1076 append("[")
1077 var first = true
1078 this@NodeList.forEach<JobNode<*>> { node ->
1079 if (first) first = false else append(", ")
1080 append(node)
1081 }
1082 append("]")
1083 }
1084}
1085
1086private class InvokeOnCompletion(
1087 job: Job,
1088 private val handler: CompletionHandler
1089) : JobNode<Job>(job) {
1090 override fun invoke(cause: Throwable?) = handler.invoke(cause)
1091 override fun toString() = "InvokeOnCompletion[$classSimpleName@$hexAddress]"
1092}
1093
1094private class ResumeOnCompletion(
1095 job: Job,
1096 private val continuation: Continuation<Unit>
1097) : JobNode<Job>(job) {
1098 override fun invoke(cause: Throwable?) = continuation.resume(Unit)
1099 override fun toString() = "ResumeOnCompletion[$continuation]"
1100}
1101
1102internal class DisposeOnCompletion(
1103 job: Job,
1104 private val handle: DisposableHandle
1105) : JobNode<Job>(job) {
1106 override fun invoke(cause: Throwable?) = handle.dispose()
1107 override fun toString(): String = "DisposeOnCompletion[$handle]"
1108}
1109
1110private class SelectJoinOnCompletion<R>(
1111 job: JobSupport,
1112 private val select: SelectInstance<R>,
1113 private val block: suspend () -> R
1114) : JobNode<JobSupport>(job) {
1115 override fun invoke(cause: Throwable?) {
1116 if (select.trySelect(null))
1117 block.startCoroutineCancellable(select.completion)
1118 }
1119 override fun toString(): String = "SelectJoinOnCompletion[$select]"
1120}
1121
1122private class SelectAwaitOnCompletion<T, R>(
1123 job: JobSupport,
1124 private val select: SelectInstance<R>,
1125 private val block: suspend (T) -> R
1126) : JobNode<JobSupport>(job) {
1127 override fun invoke(cause: Throwable?) {
1128 if (select.trySelect(null))
1129 job.selectAwaitCompletion(select, block)
1130 }
1131 override fun toString(): String = "SelectAwaitOnCompletion[$select]"
1132}
1133
1134// -------- invokeOnCancellation nodes
1135
1136/**
1137 * Marker for node that shall be invoked on cancellation (in _cancelling_ state).
1138 * **Note: may be invoked multiple times during cancellation.**
1139 */
1140internal abstract class JobCancellationNode<out J : Job>(job: J) : JobNode<J>(job)
1141
1142private class InvokeOnCancellation(
1143 job: Job,
1144 private val handler: CompletionHandler
1145) : JobCancellationNode<Job>(job) {
1146 // delegate handler shall be invoked at most once, so here is an additional flag
1147 private val _invoked = atomic(0)
1148 override fun invoke(cause: Throwable?) {
1149 if (_invoked.compareAndSet(0, 1)) handler.invoke(cause)
1150 }
1151 override fun toString() = "InvokeOnCancellation[$classSimpleName@$hexAddress]"
1152}
1153
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +03001154internal class ChildJob(
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001155 parent: JobSupport,
1156 @JvmField val childJob: Job
1157) : JobCancellationNode<JobSupport>(parent) {
1158 override fun invoke(cause: Throwable?) {
1159 // Always materialize the actual instance of parent's completion exception and cancel child with it
1160 childJob.cancel(job.getCancellationException())
1161 }
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +03001162 override fun toString(): String = "ChildJob[$childJob]"
1163}
1164
1165// Same as ChildJob, but for cancellable continuation
1166internal class ChildContinuation(
1167 parent: Job,
1168 @JvmField val child: AbstractContinuation<*>
1169) : JobCancellationNode<Job>(parent) {
1170 override fun invoke(cause: Throwable?) {
1171 child.cancel(job.getCancellationException())
1172 }
1173 override fun toString(): String = "ChildContinuation[$child]"
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001174}
1175
1176private class ChildCompletion(
1177 private val parent: JobSupport,
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +03001178 private val child: ChildJob,
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001179 private val proposedUpdate: Any?
1180) : JobNode<Job>(child.childJob) {
1181 override fun invoke(cause: Throwable?) {
1182 parent.continueCompleting(child, proposedUpdate)
1183 }
1184}