blob: 1f975e550c285ec0d6658cd9c94586e237cd24cc [file] [log] [blame]
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001package kotlinx.coroutines.experimental
2
3import kotlinx.atomicfu.*
4import kotlinx.coroutines.experimental.internal.*
5import kotlinx.coroutines.experimental.internalAnnotations.*
6import kotlinx.coroutines.experimental.intrinsics.*
7import kotlinx.coroutines.experimental.selects.*
8import kotlin.coroutines.experimental.*
9import kotlin.coroutines.experimental.intrinsics.*
10
11/**
12 * A concrete implementation of [Job]. It is optionally a child to a parent job.
13 * This job is cancelled when the parent is complete, but not vise-versa.
14 *
15 * This is an open class designed for extension by more specific classes that might augment the
16 * state and mare store addition state information for completed jobs, like their result values.
17 *
18 * @param active when `true` the job is created in _active_ state, when `false` in _new_ state. See [Job] for details.
19 * @suppress **This is unstable API and it is subject to change.**
20 */
21internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0 {
22 final override val key: CoroutineContext.Key<*> get() = Job
23
24 /*
25 === Internal states ===
26
27 name state class public state description
28 ------ ------------ ------------ -----------
29 EMPTY_N EmptyNew : New no listeners
30 EMPTY_A EmptyActive : Active no listeners
31 SINGLE JobNode : Active a single listener
32 SINGLE+ JobNode : Active a single listener + NodeList added as its next
33 LIST_N NodeList : New a list of listeners (promoted once, does not got back to EmptyNew)
34 LIST_A NodeList : Active a list of listeners (promoted once, does not got back to JobNode/EmptyActive)
35 COMPLETING Finishing : Completing has a list of listeners (promoted once from LIST_*)
36 CANCELLING Finishing : Cancelling has a list of listeners (promoted once from LIST_*)
37 FINAL_C Cancelled : Cancelled cancelled (final state)
38 FINAL_F Failed : Completed failed for other reason (final state)
39 FINAL_R <any> : Completed produced some result
40
41 === Transitions ===
42
43 New states Active states Inactive states
44
45 +---------+ +---------+ }
46 | EMPTY_N | --+-> | EMPTY_A | ----+ } Empty states
47 +---------+ | +---------+ | }
48 | | | ^ | +----------+
49 | | | | +--> | FINAL_* |
50 | | V | | +----------+
51 | | +---------+ | }
52 | | | SINGLE | ----+ } JobNode states
53 | | +---------+ | }
54 | | | | }
55 | | V | }
56 | | +---------+ | }
57 | +-- | SINGLE+ | ----+ }
58 | +---------+ | }
59 | | |
60 V V |
61 +---------+ +---------+ | }
62 | LIST_N | ----> | LIST_A | ----+ } NodeList states
63 +---------+ +---------+ | }
64 | | | | |
65 | | +--------+ | |
66 | | | V |
67 | | | +------------+ | +------------+ }
68 | +-------> | COMPLETING | --+-- | CANCELLING | } Finishing states
69 | | +------------+ +------------+ }
70 | | | ^
71 | | | |
72 +--------+---------+--------------------+
73
74
75 This state machine and its transition matrix are optimized for the common case when job is created in active
76 state (EMPTY_A) and at most one completion listener is added to it during its life-time.
77
78 Note, that the actual `_state` variable can also be a reference to atomic operation descriptor `OpDescriptor`
79 */
80
81 // Note: use shared objects while we have no listeners
82 private val _state = atomic<Any?>(if (active) EmptyActive else EmptyNew)
83
84 @Volatile
85 private var parentHandle: DisposableHandle? = null
86
87 // ------------ initialization ------------
88
89 /**
90 * Initializes parent job.
91 * It shall be invoked at most once after construction after all other initialization.
92 * @suppress **This is unstable API and it is subject to change.**
93 */
94 internal fun initParentJobInternal(parent: Job?) {
95 check(parentHandle == null)
96 if (parent == null) {
97 parentHandle = NonDisposableHandle
98 return
99 }
100 parent.start() // make sure the parent is started
101 @Suppress("DEPRECATION")
102 val handle = parent.attachChild(this)
103 parentHandle = handle
104 // now check our state _after_ registering (see updateState order of actions)
105 if (isCompleted) {
106 handle.dispose()
107 parentHandle = NonDisposableHandle // release it just in case, to aid GC
108 }
109 }
110
111 // ------------ state query ------------
112
113 /**
114 * Returns current state of this job.
115 * @suppress **This is unstable API and it is subject to change.**
116 */
117 internal val state: Any? get() {
118 _state.loop { state -> // helper loop on state (complete in-progress atomic operations)
119 if (state !is OpDescriptor) return state
120 state.perform(this)
121 }
122 }
123
124 /**
125 * @suppress **This is unstable API and it is subject to change.**
126 */
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300127 private inline fun loopOnState(block: (Any?) -> Unit): Nothing {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300128 while (true) {
129 block(state)
130 }
131 }
132
133 public final override val isActive: Boolean get() {
134 val state = this.state
135 return state is Incomplete && state.isActive
136 }
137
138 public final override val isCompleted: Boolean get() = state !is Incomplete
139
140 public final override val isCancelled: Boolean get() {
141 val state = this.state
142 return state is Cancelled || (state is Finishing && state.cancelled != null)
143 }
144
145 // ------------ state update ------------
146
147 /**
148 * Updates current [state] of this job. Returns `false` if current state is not equal to expected.
149 * @suppress **This is unstable API and it is subject to change.**
150 */
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300151 private fun updateState(expect: Incomplete, proposedUpdate: Any?, mode: Int): Boolean {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300152 val update = coerceProposedUpdate(expect, proposedUpdate)
153 if (!tryUpdateState(expect, update)) return false
154 completeUpdateState(expect, update, mode)
155 return true
156 }
157
158 // when Job is in Cancelling state, it can only be promoted to Cancelled state,
159 // so if the proposed Update is not an appropriate Cancelled (preserving the cancellation cause),
160 // then the corresponding Cancelled state is constructed.
161 private fun coerceProposedUpdate(expect: Incomplete, proposedUpdate: Any?): Any? =
162 if (expect is Finishing && expect.cancelled != null && !isCorrespondinglyCancelled(expect.cancelled, proposedUpdate))
163 createCancelled(expect.cancelled, proposedUpdate) else proposedUpdate
164
165 private fun isCorrespondinglyCancelled(cancelled: Cancelled, proposedUpdate: Any?): Boolean {
166 if (proposedUpdate !is Cancelled) return false
167 // NOTE: equality comparison of causes is performed here by design, see equals of JobCancellationException
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300168 return proposedUpdate.cause == cancelled.cause || proposedUpdate.cause is JobCancellationException
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300169 }
170
171 private fun createCancelled(cancelled: Cancelled, proposedUpdate: Any?): Cancelled {
172 if (proposedUpdate !is CompletedExceptionally) return cancelled // not exception -- just use original cancelled
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300173 val exception = proposedUpdate.cause
174 if (cancelled.cause == exception) return cancelled // that is the cancelled we need already!
175 // todo: We need to rework this logic to keep original cancellation cause in the state and suppress other exceptions
176 // that could have occurred while coroutine is being cancelled.
177 // Do not spam with JCE in suppressed exceptions
178 if (cancelled.cause !is JobCancellationException) {
179 exception.addSuppressedThrowable(cancelled.cause)
180 }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300181 return Cancelled(this, exception)
182 }
183
184 /**
185 * Tries to initiate update of the current [state] of this job.
186 * @suppress **This is unstable API and it is subject to change.**
187 */
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300188 private fun tryUpdateState(expect: Incomplete, update: Any?): Boolean {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300189 require(update !is Incomplete) // only incomplete -> completed transition is allowed
190 if (!_state.compareAndSet(expect, update)) return false
191 // Unregister from parent job
192 parentHandle?.let {
193 it.dispose() // volatile read parentHandle _after_ state was updated
194 parentHandle = NonDisposableHandle // release it just in case, to aid GC
195 }
196 return true // continues in completeUpdateState
197 }
198
199 /**
200 * Completes update of the current [state] of this job.
201 * @suppress **This is unstable API and it is subject to change.**
202 */
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300203 private fun completeUpdateState(expect: Incomplete, update: Any?, mode: Int) {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300204 val exceptionally = update as? CompletedExceptionally
205 // Do overridable processing before completion handlers
206 if (!expect.isCancelling) onCancellationInternal(exceptionally) // only notify when was not cancelling before
207 onCompletionInternal(update, mode)
208 // Invoke completion handlers
209 val cause = exceptionally?.cause
210 if (expect is JobNode<*>) { // SINGLE/SINGLE+ state -- one completion handler (common case)
211 try {
212 expect.invoke(cause)
213 } catch (ex: Throwable) {
214 handleException(CompletionHandlerException("Exception in completion handler $expect for $this", ex))
215 }
216 } else {
217 expect.list?.notifyCompletion(cause)
218 }
219 }
220
221 private inline fun <reified T: JobNode<*>> notifyHandlers(list: NodeList, cause: Throwable?) {
222 var exception: Throwable? = null
223 list.forEach<T> { node ->
224 try {
225 node.invoke(cause)
226 } catch (ex: Throwable) {
227 exception?.apply { addSuppressedThrowable(ex) } ?: run {
228 exception = CompletionHandlerException("Exception in completion handler $node for $this", ex)
229 }
230 }
231 }
232 exception?.let { handleException(it) }
233 }
234
235 private fun NodeList.notifyCompletion(cause: Throwable?) =
236 notifyHandlers<JobNode<*>>(this, cause)
237
238 private fun notifyCancellation(list: NodeList, cause: Throwable?) =
239 notifyHandlers<JobCancellationNode<*>>(list, cause)
240
241 public final override fun start(): Boolean {
242 loopOnState { state ->
243 when (startInternal(state)) {
244 FALSE -> return false
245 TRUE -> return true
246 }
247 }
248 }
249
250 // returns: RETRY/FALSE/TRUE:
251 // FALSE when not new,
252 // TRUE when started
253 // RETRY when need to retry
254 private fun startInternal(state: Any?): Int {
255 when (state) {
256 is Empty -> { // EMPTY_X state -- no completion handlers
257 if (state.isActive) return FALSE // already active
258 if (!_state.compareAndSet(state, EmptyActive)) return RETRY
259 onStartInternal()
260 return TRUE
261 }
262 is NodeList -> { // LIST -- a list of completion handlers (either new or active)
263 return state.tryMakeActive().also { result ->
264 if (result == TRUE) onStartInternal()
265 }
266 }
267 else -> return FALSE // not a new state
268 }
269 }
270
271 /**
272 * Override to provide the actual [start] action.
273 * This function is invoked exactly once when non-active coroutine is [started][start].
274 */
275 internal open fun onStartInternal() {}
276
277 public final override fun getCancellationException(): CancellationException {
278 val state = this.state
279 return when {
280 state is Finishing && state.cancelled != null ->
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300281 state.cancelled.cause.toCancellationException("Job is being cancelled")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300282 state is Incomplete ->
283 error("Job was not completed or cancelled yet: $this")
284 state is CompletedExceptionally ->
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300285 state.cause.toCancellationException("Job has failed")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300286 else -> JobCancellationException("Job has completed normally", null, this)
287 }
288 }
289
290 private fun Throwable.toCancellationException(message: String): CancellationException =
291 this as? CancellationException ?: JobCancellationException(message, this, this@JobSupport)
292
293 /**
294 * Returns the cause that signals the completion of this job -- it returns the original
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300295 * [cancel] cause, [JobCancellationException] or **`null` if this job had completed normally**.
296 * This function throws [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300297 * [isCancelled] yet.
298 */
299 protected fun getCompletionCause(): Throwable? {
300 val state = this.state
301 return when {
302 state is Finishing && state.cancelled != null -> state.cancelled.cause
303 state is Incomplete -> error("Job was not completed or cancelled yet")
304 state is CompletedExceptionally -> state.cause
305 else -> null
306 }
307 }
308
309 @Suppress("OverridingDeprecatedMember")
310 public final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle =
311 invokeOnCompletion(onCancelling = false, invokeImmediately = true, handler = handler)
312
313 @Suppress("OverridingDeprecatedMember")
314 public final override fun invokeOnCompletion(handler: CompletionHandler, onCancelling: Boolean): DisposableHandle =
315 invokeOnCompletion(onCancelling = onCancelling, invokeImmediately = true, handler = handler)
316
317 @Suppress("OverridingDeprecatedMember")
318 public final override fun invokeOnCompletion(onCancelling_: Boolean, handler: CompletionHandler): DisposableHandle =
319 invokeOnCompletion(onCancelling = onCancelling_, invokeImmediately = true, handler = handler)
320
321 // todo: non-final as a workaround for KT-21968, should be final in the future
322 public override fun invokeOnCompletion(
323 onCancelling: Boolean,
324 invokeImmediately: Boolean,
325 handler: CompletionHandler
326 ): DisposableHandle {
327 var nodeCache: JobNode<*>? = null
328 loopOnState { state ->
329 when (state) {
330 is Empty -> { // EMPTY_X state -- no completion handlers
331 if (state.isActive) {
332 // try move to SINGLE state
333 val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
334 if (_state.compareAndSet(state, node)) return node
335 } else
336 promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
337 }
338 is Incomplete -> {
339 val list = state.list
340 if (list == null) { // SINGLE/SINGLE+
341 promoteSingleToNodeList(state as JobNode<*>)
342 } else {
343 if (state is Finishing && state.cancelled != null && onCancelling) {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300344 // installing cancellation handler on job that is being cancelled
345 if (invokeImmediately) handler(state.cancelled.cause)
346 return NonDisposableHandle
347 }
348 val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
349 if (addLastAtomic(state, list, node)) return node
350 }
351 }
352 else -> { // is complete
353 // :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
354 // because we play type tricks on Kotlin/JS and handler is not necessarily a function there
355 if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause)
356 return NonDisposableHandle
357 }
358 }
359 }
360 }
361
362 private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode<*> {
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300363 return if (onCancelling)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300364 (handler as? JobCancellationNode<*>)?.also { require(it.job === this) }
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300365 ?: InvokeOnCancellation(this, handler)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300366 else
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300367 (handler as? JobNode<*>)?.also { require(it.job === this && it !is JobCancellationNode) }
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300368 ?: InvokeOnCompletion(this, handler)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300369 }
370
371 private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode<*>) =
372 list.addLastIf(node) { this.state === expect }
373
374 private fun promoteEmptyToNodeList(state: Empty) {
375 // try to promote it to list in new state
376 _state.compareAndSet(state, NodeList(state.isActive))
377 }
378
379 private fun promoteSingleToNodeList(state: JobNode<*>) {
380 // try to promote it to list (SINGLE+ state)
381 state.addOneIfEmpty(NodeList(active = true))
382 // it must be in SINGLE+ state or state has changed (node could have need removed from state)
383 val list = state.nextNode // either our NodeList or somebody else won the race, updated state
384 // just attempt converting it to list if state is still the same, then we'll continue lock-free loop
385 _state.compareAndSet(state, list)
386 }
387
388 public final override suspend fun join() {
389 if (!joinInternal()) { // fast-path no wait
390 return suspendCoroutineOrReturn { cont ->
391 cont.context.checkCompletion()
392 Unit // do not suspend
393 }
394 }
395 return joinSuspend() // slow-path wait
396 }
397
398 private fun joinInternal(): Boolean {
399 loopOnState { state ->
400 if (state !is Incomplete) return false // not active anymore (complete) -- no need to wait
401 if (startInternal(state) >= 0) return true // wait unless need to retry
402 }
403 }
404
405 private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300406 // We have to invoke join() handler only on cancellation, on completion we will be resumed regularly without handlers
407 cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(this, cont).asHandler))
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300408 }
409
410 public final override val onJoin: SelectClause0
411 get() = this
412
413 // registerSelectJoin
414 public final override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) {
415 // fast-path -- check state and select/return if needed
416 loopOnState { state ->
417 if (select.isSelected) return
418 if (state !is Incomplete) {
419 // already complete -- select result
420 if (select.trySelect(null)) {
421 select.completion.context.checkCompletion() // always check for our completion
422 block.startCoroutineUndispatched(select.completion)
423 }
424 return
425 }
426 if (startInternal(state) == 0) {
427 // slow-path -- register waiter for completion
428 select.disposeOnSelect(invokeOnCompletion(handler = SelectJoinOnCompletion(this, select, block).asHandler))
429 return
430 }
431 }
432 }
433
434 /**
435 * @suppress **This is unstable API and it is subject to change.**
436 */
437 internal fun removeNode(node: JobNode<*>) {
438 // remove logic depends on the state of the job
439 loopOnState { state ->
440 when (state) {
441 is JobNode<*> -> { // SINGE/SINGLE+ state -- one completion handler
442 if (state !== node) return // a different job node --> we were already removed
443 // try remove and revert back to empty state
444 if (_state.compareAndSet(state, EmptyActive)) return
445 }
446 is Incomplete -> { // may have a list of completion handlers
447 // remove node from the list if there is a list
448 if (state.list != null) node.remove()
449 return
450 }
451 else -> return // it is complete and does not have any completion handlers
452 }
453 }
454 }
455
456 /**
457 * @suppress **This is unstable API and it is subject to change.**
458 */
459 internal open val onCancelMode: Int get() = ON_CANCEL_MAKE_CANCELLING
460
461 public override fun cancel(cause: Throwable?): Boolean = when (onCancelMode) {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300462 ON_CANCEL_MAKE_CANCELLING -> makeCancelling(cause)
463 ON_CANCEL_MAKE_COMPLETING -> makeCompletingOnCancel(cause)
464 else -> error("Invalid onCancelMode $onCancelMode")
465 }
466
467 // we will be dispatching coroutine to process its cancellation exception, so there is no need for
468 // an extra check for Job status in MODE_CANCELLABLE
469 private fun updateStateCancelled(state: Incomplete, cause: Throwable?) =
470 updateState(state, Cancelled(this, cause), mode = MODE_ATOMIC_DEFAULT)
471
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300472 // transitions to Cancelling state
473 private fun makeCancelling(cause: Throwable?): Boolean {
474 loopOnState { state ->
475 when (state) {
476 is Empty -> { // EMPTY_X state -- no completion handlers
477 if (state.isActive) {
478 promoteEmptyToNodeList(state) // this way can wrap it into Cancelling on next pass
479 } else {
480 // cancelling a non-started coroutine makes it immediately cancelled
481 // (and we have no listeners to notify which makes it very simple)
482 if (updateStateCancelled(state, cause)) return true
483 }
484 }
485 is JobNode<*> -> { // SINGLE/SINGLE+ state -- one completion handler
486 promoteSingleToNodeList(state)
487 }
488 is NodeList -> { // LIST -- a list of completion handlers (either new or active)
489 if (state.isActive) {
490 if (tryMakeCancelling(state, state.list, cause)) return true
491 } else {
492 // cancelling a non-started coroutine makes it immediately cancelled
493 if (updateStateCancelled(state, cause))
494 return true
495 }
496 }
497 is Finishing -> { // Completing/Cancelling the job, may cancel
498 if (state.cancelled != null) return false // already cancelling
499 if (tryMakeCancelling(state, state.list, cause)) return true
500 }
501 else -> { // is inactive
502 return false
503 }
504 }
505 }
506 }
507
508 // try make expected state in cancelling on the condition that we're still in this state
509 private fun tryMakeCancelling(expect: Incomplete, list: NodeList, cause: Throwable?): Boolean {
510 val cancelled = Cancelled(this, cause)
511 if (!_state.compareAndSet(expect, Finishing(list, cancelled, false))) return false
512 onFinishingInternal(cancelled)
513 onCancellationInternal(cancelled)
514 notifyCancellation(list, cause)
515 return true
516 }
517
518 private fun makeCompletingOnCancel(cause: Throwable?): Boolean =
519 makeCompleting(Cancelled(this, cause))
520
521 /**
522 * @suppress **This is unstable API and it is subject to change.**
523 */
524 internal fun makeCompleting(proposedUpdate: Any?): Boolean =
525 when (makeCompletingInternal(proposedUpdate, mode = MODE_ATOMIC_DEFAULT)) {
526 COMPLETING_ALREADY_COMPLETING -> false
527 else -> true
528 }
529
530 /**
531 * Returns:
532 * * `true` if state was updated to completed/cancelled;
533 * * `false` if made completing or it is cancelling and is waiting for children.
534 *
535 * @throws IllegalStateException if job is already complete or completing
536 * @suppress **This is unstable API and it is subject to change.**
537 */
538 internal fun makeCompletingOnce(proposedUpdate: Any?, mode: Int): Boolean =
539 when (makeCompletingInternal(proposedUpdate, mode)) {
540 COMPLETING_COMPLETED -> true
541 COMPLETING_WAITING_CHILDREN -> false
542 else -> throw IllegalStateException("Job $this is already complete or completing, " +
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300543 "but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300544 }
545
546 private fun makeCompletingInternal(proposedUpdate: Any?, mode: Int): Int {
547 loopOnState { state ->
548 if (state !is Incomplete)
549 return COMPLETING_ALREADY_COMPLETING
550 if (state is Finishing && state.completing)
551 return COMPLETING_ALREADY_COMPLETING
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300552 val child: ChildJob? = firstChild(state) ?: // or else complete immediately w/o children
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300553 when {
554 state !is Finishing && hasOnFinishingHandler(proposedUpdate) -> null // unless it has onFinishing handler
555 updateState(state, proposedUpdate, mode) -> return COMPLETING_COMPLETED
556 else -> return@loopOnState
557 }
558 val list = state.list ?: // must promote to list to correctly operate on child lists
559 when (state) {
560 is Empty -> {
561 promoteEmptyToNodeList(state)
562 return@loopOnState // retry
563 }
564 is JobNode<*> -> {
565 promoteSingleToNodeList(state)
566 return@loopOnState // retry
567 }
568 else -> error("Unexpected state with an empty list: $state")
569 }
570 // cancel all children in list on exceptional completion
571 if (proposedUpdate is CompletedExceptionally)
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300572 child?.cancelChildrenInternal(proposedUpdate.cause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300573 // switch to completing state
574 val cancelled = (state as? Finishing)?.cancelled ?: (proposedUpdate as? Cancelled)
575 val completing = Finishing(list, cancelled, true)
576 if (_state.compareAndSet(state, completing)) {
577 if (state !is Finishing) onFinishingInternal(proposedUpdate)
578 if (child != null && tryWaitForChild(child, proposedUpdate))
579 return COMPLETING_WAITING_CHILDREN
580 if (updateState(completing, proposedUpdate, mode = MODE_ATOMIC_DEFAULT))
581 return COMPLETING_COMPLETED
582 }
583 }
584 }
585
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300586 private tailrec fun ChildJob.cancelChildrenInternal(cause: Throwable) {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300587 childJob.cancel(JobCancellationException("Child job was cancelled because of parent failure", cause, childJob))
588 nextChild()?.cancelChildrenInternal(cause)
589 }
590
591 private val Any?.exceptionOrNull: Throwable?
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300592 get() = (this as? CompletedExceptionally)?.cause
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300593
594 private fun firstChild(state: Incomplete) =
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300595 state as? ChildJob ?: state.list?.nextChild()
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300596
597 // return false when there is no more incomplete children to wait
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300598 private tailrec fun tryWaitForChild(child: ChildJob, proposedUpdate: Any?): Boolean {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300599 val handle = child.childJob.invokeOnCompletion(invokeImmediately = false,
600 handler = ChildCompletion(this, child, proposedUpdate).asHandler)
601 if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
602 val nextChild = child.nextChild() ?: return false
603 return tryWaitForChild(nextChild, proposedUpdate)
604 }
605
606 /**
607 * @suppress **This is unstable API and it is subject to change.**
608 */
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300609 internal fun continueCompleting(lastChild: ChildJob, proposedUpdate: Any?) {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300610 loopOnState { state ->
611 if (state !is Finishing)
612 throw IllegalStateException("Job $this is found in expected state while completing with $proposedUpdate", proposedUpdate.exceptionOrNull)
613 // figure out if we need to wait for next child
614 val waitChild = lastChild.nextChild()
615 // try wait for next child
616 if (waitChild != null && tryWaitForChild(waitChild, proposedUpdate)) return // waiting for next child
617 // no more children to wait -- try update state
618 if (updateState(state, proposedUpdate, MODE_ATOMIC_DEFAULT)) return
619 }
620 }
621
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300622 private fun LockFreeLinkedListNode.nextChild(): ChildJob? {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300623 var cur = this
624 while (cur.isRemoved) cur = cur.prevNode // rollback to prev non-removed (or list head)
625 while (true) {
626 cur = cur.nextNode
627 if (cur.isRemoved) continue
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300628 if (cur is ChildJob) return cur
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300629 if (cur is NodeList) return null // checked all -- no more children
630 }
631 }
632
633 public final override val children: Sequence<Job> get() = buildSequence {
634 val state = this@JobSupport.state
635 when (state) {
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300636 is ChildJob -> yield(state.childJob)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300637 is Incomplete -> state.list?.let { list ->
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300638 list.forEach<ChildJob> { yield(it.childJob) }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300639 }
640 }
641 }
642
643 @Suppress("OverridingDeprecatedMember")
644 public final override fun attachChild(child: Job): DisposableHandle =
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300645 invokeOnCompletion(onCancelling = true, handler = ChildJob(this, child).asHandler)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300646
647 @Suppress("OverridingDeprecatedMember")
648 public final override fun cancelChildren(cause: Throwable?) {
649 this.cancelChildren(cause) // use extension function
650 }
651
652 /**
653 * Override to process any exceptions that were encountered while invoking completion handlers
654 * installed via [invokeOnCompletion].
655 * @suppress **This is unstable API and it is subject to change.**
656 */
657 internal open fun handleException(exception: Throwable) {
658 throw exception
659 }
660
661 /**
662 * This function is invoked once when job is cancelled or is completed, similarly to [invokeOnCompletion] with
663 * `onCancelling` set to `true`.
664 * @param exceptionally not null when the the job was cancelled or completed exceptionally,
665 * null when it has completed normally.
666 * @suppress **This is unstable API and it is subject to change.**
667 */
668 internal open fun onCancellationInternal(exceptionally: CompletedExceptionally?) {}
669
670 /**
671 * @suppress **This is unstable API and it is subject to change.**
672 */
673 internal open fun hasOnFinishingHandler(update: Any?) = false
674
675 /**
676 * @suppress **This is unstable API and it is subject to change.**
677 */
678 internal open fun onFinishingInternal(update: Any?) {}
679
680 /**
681 * Override for post-completion actions that need to do something with the state.
682 * @param mode completion mode.
683 * @suppress **This is unstable API and it is subject to change.**
684 */
685 internal open fun onCompletionInternal(state: Any?, mode: Int) {}
686
687 // for nicer debugging
688 public override fun toString(): String =
689 "${nameString()}{${stateString()}}@$hexAddress"
690
691 /**
692 * @suppress **This is unstable API and it is subject to change.**
693 */
694 internal open fun nameString(): String = classSimpleName
695
696 private fun stateString(): String {
697 val state = this.state
698 return when (state) {
699 is Finishing -> buildString {
700 if (state.cancelled != null) append("Cancelling")
701 if (state.completing) append("Completing")
702 }
703 is Incomplete -> if (state.isActive) "Active" else "New"
704 is Cancelled -> "Cancelled"
705 is CompletedExceptionally -> "CompletedExceptionally"
706 else -> "Completed"
707 }
708 }
709
710 // Cancelling or Completing
711 private class Finishing(
712 override val list: NodeList,
713 @JvmField val cancelled: Cancelled?, /* != null when cancelling */
714 @JvmField val completing: Boolean /* true when completing */
715 ) : Incomplete {
716 override val isActive: Boolean get() = cancelled == null
717 }
718
719 private val Incomplete.isCancelling: Boolean
720 get() = this is Finishing && cancelled != null
721
722 /*
723 * =================================================================================================
724 * This is ready-to-use implementation for Deferred interface.
725 * However, it is not type-safe. Conceptually it just exposes the value of the underlying
726 * completed state as `Any?`
727 * =================================================================================================
728 */
729
730 public val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally
731
732 public fun getCompletionExceptionOrNull(): Throwable? {
733 val state = this.state
734 check(state !is Incomplete) { "This job has not completed yet" }
735 return state.exceptionOrNull
736 }
737
738 /**
739 * @suppress **This is unstable API and it is subject to change.**
740 */
741 internal fun getCompletedInternal(): Any? {
742 val state = this.state
743 check(state !is Incomplete) { "This job has not completed yet" }
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300744 if (state is CompletedExceptionally) throw state.cause
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300745 return state
746 }
747
748 /**
749 * @suppress **This is unstable API and it is subject to change.**
750 */
751 internal suspend fun awaitInternal(): Any? {
752 // fast-path -- check state (avoid extra object creation)
753 while(true) { // lock-free loop on state
754 val state = this.state
755 if (state !is Incomplete) {
756 // already complete -- just return result
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300757 if (state is CompletedExceptionally) throw state.cause
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300758 return state
759
760 }
761 if (startInternal(state) >= 0) break // break unless needs to retry
762 }
763 return awaitSuspend() // slow-path
764 }
765
766 private suspend fun awaitSuspend(): Any? = suspendCancellableCoroutine { cont ->
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300767 // We have to invoke await() handler only on cancellation, on completion we will be resumed regularly without handlers
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300768 cont.disposeOnCancellation(invokeOnCompletion {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300769 val state = this.state
770 check(state !is Incomplete)
771 if (state is CompletedExceptionally)
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300772 cont.resumeWithException(state.cause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300773 else
774 cont.resume(state)
775 })
776 }
777
778 /**
779 * @suppress **This is unstable API and it is subject to change.**
780 */
781 // registerSelectAwaitInternal
782 @Suppress("UNCHECKED_CAST")
783 internal fun <T, R> registerSelectClause1Internal(select: SelectInstance<R>, block: suspend (T) -> R) {
784 // fast-path -- check state and select/return if needed
785 loopOnState { state ->
786 if (select.isSelected) return
787 if (state !is Incomplete) {
788 // already complete -- select result
789 if (select.trySelect(null)) {
790 if (state is CompletedExceptionally)
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300791 select.resumeSelectCancellableWithException(state.cause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300792 else
793 block.startCoroutineUndispatched(state as T, select.completion)
794 }
795 return
796 }
797 if (startInternal(state) == 0) {
798 // slow-path -- register waiter for completion
799 select.disposeOnSelect(invokeOnCompletion(handler = SelectAwaitOnCompletion(this, select, block).asHandler))
800 return
801 }
802 }
803 }
804
805 /**
806 * @suppress **This is unstable API and it is subject to change.**
807 */
808 @Suppress("UNCHECKED_CAST")
809 internal fun <T, R> selectAwaitCompletion(select: SelectInstance<R>, block: suspend (T) -> R) {
810 val state = this.state
811 // Note: await is non-atomic (can be cancelled while dispatched)
812 if (state is CompletedExceptionally)
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300813 select.resumeSelectCancellableWithException(state.cause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300814 else
815 block.startCoroutineCancellable(state as T, select.completion)
816 }
817}
818
819// --------------- helper classes to simplify job implementation
820
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300821internal const val ON_CANCEL_MAKE_CANCELLING = 0
822internal const val ON_CANCEL_MAKE_COMPLETING = 1
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300823
824private const val COMPLETING_ALREADY_COMPLETING = 0
825private const val COMPLETING_COMPLETED = 1
826private const val COMPLETING_WAITING_CHILDREN = 2
827
828private const val RETRY = -1
829private const val FALSE = 0
830private const val TRUE = 1
831
832@Suppress("PrivatePropertyName")
833private val EmptyNew = Empty(false)
834@Suppress("PrivatePropertyName")
835private val EmptyActive = Empty(true)
836
837private class Empty(override val isActive: Boolean) : Incomplete {
838 override val list: NodeList? get() = null
839 override fun toString(): String = "Empty{${if (isActive) "Active" else "New" }}"
840}
841
842internal class JobImpl(parent: Job? = null) : JobSupport(true) {
843 init { initParentJobInternal(parent) }
844 override val onCancelMode: Int get() = ON_CANCEL_MAKE_COMPLETING
845}
846
847// -------- invokeOnCompletion nodes
848
849internal interface Incomplete {
850 val isActive: Boolean
851 val list: NodeList? // is null only for Empty and JobNode incomplete state objects
852}
853
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300854internal abstract class JobNode<out J : Job>(
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300855 @JvmField val job: J
Roman Elizarovdbd9e1c2018-04-28 15:14:18 +0300856) : CompletionHandlerBase(), DisposableHandle, Incomplete {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300857 override val isActive: Boolean get() = true
858 override val list: NodeList? get() = null
859 override fun dispose() = (job as JobSupport).removeNode(this)
860}
861
862internal class NodeList(
863 active: Boolean
864) : LockFreeLinkedListHead(), Incomplete {
865 private val _active = atomic(if (active) 1 else 0)
866
867 override val isActive: Boolean get() = _active.value != 0
868 override val list: NodeList get() = this
869
870 fun tryMakeActive(): Int {
871 if (_active.value != 0) return FALSE
872 if (_active.compareAndSet(0, 1)) return TRUE
873 return RETRY
874 }
875
876 override fun toString(): String = buildString {
877 append("List")
878 append(if (isActive) "{Active}" else "{New}")
879 append("[")
880 var first = true
881 this@NodeList.forEach<JobNode<*>> { node ->
882 if (first) first = false else append(", ")
883 append(node)
884 }
885 append("]")
886 }
887}
888
889private class InvokeOnCompletion(
890 job: Job,
891 private val handler: CompletionHandler
892) : JobNode<Job>(job) {
893 override fun invoke(cause: Throwable?) = handler.invoke(cause)
894 override fun toString() = "InvokeOnCompletion[$classSimpleName@$hexAddress]"
895}
896
897private class ResumeOnCompletion(
898 job: Job,
899 private val continuation: Continuation<Unit>
900) : JobNode<Job>(job) {
901 override fun invoke(cause: Throwable?) = continuation.resume(Unit)
902 override fun toString() = "ResumeOnCompletion[$continuation]"
903}
904
905internal class DisposeOnCompletion(
906 job: Job,
907 private val handle: DisposableHandle
908) : JobNode<Job>(job) {
909 override fun invoke(cause: Throwable?) = handle.dispose()
910 override fun toString(): String = "DisposeOnCompletion[$handle]"
911}
912
913private class SelectJoinOnCompletion<R>(
914 job: JobSupport,
915 private val select: SelectInstance<R>,
916 private val block: suspend () -> R
917) : JobNode<JobSupport>(job) {
918 override fun invoke(cause: Throwable?) {
919 if (select.trySelect(null))
920 block.startCoroutineCancellable(select.completion)
921 }
922 override fun toString(): String = "SelectJoinOnCompletion[$select]"
923}
924
925private class SelectAwaitOnCompletion<T, R>(
926 job: JobSupport,
927 private val select: SelectInstance<R>,
928 private val block: suspend (T) -> R
929) : JobNode<JobSupport>(job) {
930 override fun invoke(cause: Throwable?) {
931 if (select.trySelect(null))
932 job.selectAwaitCompletion(select, block)
933 }
934 override fun toString(): String = "SelectAwaitOnCompletion[$select]"
935}
936
937// -------- invokeOnCancellation nodes
938
939/**
940 * Marker for node that shall be invoked on cancellation (in _cancelling_ state).
941 * **Note: may be invoked multiple times during cancellation.**
942 */
943internal abstract class JobCancellationNode<out J : Job>(job: J) : JobNode<J>(job)
944
945private class InvokeOnCancellation(
946 job: Job,
947 private val handler: CompletionHandler
948) : JobCancellationNode<Job>(job) {
949 // delegate handler shall be invoked at most once, so here is an additional flag
950 private val _invoked = atomic(0)
951 override fun invoke(cause: Throwable?) {
952 if (_invoked.compareAndSet(0, 1)) handler.invoke(cause)
953 }
954 override fun toString() = "InvokeOnCancellation[$classSimpleName@$hexAddress]"
955}
956
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300957internal class ChildJob(
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300958 parent: JobSupport,
959 @JvmField val childJob: Job
960) : JobCancellationNode<JobSupport>(parent) {
961 override fun invoke(cause: Throwable?) {
962 // Always materialize the actual instance of parent's completion exception and cancel child with it
963 childJob.cancel(job.getCancellationException())
964 }
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300965 override fun toString(): String = "ChildJob[$childJob]"
966}
967
968// Same as ChildJob, but for cancellable continuation
969internal class ChildContinuation(
970 parent: Job,
971 @JvmField val child: AbstractContinuation<*>
972) : JobCancellationNode<Job>(parent) {
973 override fun invoke(cause: Throwable?) {
974 child.cancel(job.getCancellationException())
975 }
976 override fun toString(): String = "ChildContinuation[$child]"
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300977}
978
979private class ChildCompletion(
980 private val parent: JobSupport,
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300981 private val child: ChildJob,
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300982 private val proposedUpdate: Any?
983) : JobNode<Job>(child.childJob) {
984 override fun invoke(cause: Throwable?) {
985 parent.continueCompleting(child, proposedUpdate)
986 }
987}