blob: 3ebd95e354bbd03a21954cf706de19ee3effa806 [file] [log] [blame]
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03001/*
2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03005package kotlinx.coroutines.experimental
6
7import kotlinx.atomicfu.*
8import kotlinx.coroutines.experimental.internal.*
9import kotlinx.coroutines.experimental.internalAnnotations.*
10import kotlinx.coroutines.experimental.intrinsics.*
11import kotlinx.coroutines.experimental.selects.*
12import kotlin.coroutines.experimental.*
13import kotlin.coroutines.experimental.intrinsics.*
14
15/**
16 * A concrete implementation of [Job]. It is optionally a child to a parent job.
17 * This job is cancelled when the parent is complete, but not vise-versa.
18 *
19 * This is an open class designed for extension by more specific classes that might augment the
20 * state and mare store addition state information for completed jobs, like their result values.
21 *
22 * @param active when `true` the job is created in _active_ state, when `false` in _new_ state. See [Job] for details.
23 * @suppress **This is unstable API and it is subject to change.**
24 */
25internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0 {
26 final override val key: CoroutineContext.Key<*> get() = Job
27
28 /*
29 === Internal states ===
30
31 name state class public state description
32 ------ ------------ ------------ -----------
33 EMPTY_N EmptyNew : New no listeners
34 EMPTY_A EmptyActive : Active no listeners
35 SINGLE JobNode : Active a single listener
36 SINGLE+ JobNode : Active a single listener + NodeList added as its next
37 LIST_N NodeList : New a list of listeners (promoted once, does not got back to EmptyNew)
38 LIST_A NodeList : Active a list of listeners (promoted once, does not got back to JobNode/EmptyActive)
39 COMPLETING Finishing : Completing has a list of listeners (promoted once from LIST_*)
40 CANCELLING Finishing : Cancelling has a list of listeners (promoted once from LIST_*)
41 FINAL_C Cancelled : Cancelled cancelled (final state)
42 FINAL_F Failed : Completed failed for other reason (final state)
43 FINAL_R <any> : Completed produced some result
44
45 === Transitions ===
46
47 New states Active states Inactive states
48
49 +---------+ +---------+ }
50 | EMPTY_N | --+-> | EMPTY_A | ----+ } Empty states
51 +---------+ | +---------+ | }
52 | | | ^ | +----------+
53 | | | | +--> | FINAL_* |
54 | | V | | +----------+
55 | | +---------+ | }
56 | | | SINGLE | ----+ } JobNode states
57 | | +---------+ | }
58 | | | | }
59 | | V | }
60 | | +---------+ | }
61 | +-- | SINGLE+ | ----+ }
62 | +---------+ | }
63 | | |
64 V V |
65 +---------+ +---------+ | }
66 | LIST_N | ----> | LIST_A | ----+ } NodeList states
67 +---------+ +---------+ | }
68 | | | | |
69 | | +--------+ | |
70 | | | V |
71 | | | +------------+ | +------------+ }
72 | +-------> | COMPLETING | --+-- | CANCELLING | } Finishing states
73 | | +------------+ +------------+ }
74 | | | ^
75 | | | |
76 +--------+---------+--------------------+
77
78
79 This state machine and its transition matrix are optimized for the common case when job is created in active
80 state (EMPTY_A) and at most one completion listener is added to it during its life-time.
81
82 Note, that the actual `_state` variable can also be a reference to atomic operation descriptor `OpDescriptor`
83 */
84
85 // Note: use shared objects while we have no listeners
86 private val _state = atomic<Any?>(if (active) EmptyActive else EmptyNew)
87
88 @Volatile
89 private var parentHandle: DisposableHandle? = null
90
91 // ------------ initialization ------------
92
93 /**
94 * Initializes parent job.
95 * It shall be invoked at most once after construction after all other initialization.
96 * @suppress **This is unstable API and it is subject to change.**
97 */
98 internal fun initParentJobInternal(parent: Job?) {
99 check(parentHandle == null)
100 if (parent == null) {
101 parentHandle = NonDisposableHandle
102 return
103 }
104 parent.start() // make sure the parent is started
105 @Suppress("DEPRECATION")
106 val handle = parent.attachChild(this)
107 parentHandle = handle
108 // now check our state _after_ registering (see updateState order of actions)
109 if (isCompleted) {
110 handle.dispose()
111 parentHandle = NonDisposableHandle // release it just in case, to aid GC
112 }
113 }
114
115 // ------------ state query ------------
116
117 /**
118 * Returns current state of this job.
119 * @suppress **This is unstable API and it is subject to change.**
120 */
121 internal val state: Any? get() {
122 _state.loop { state -> // helper loop on state (complete in-progress atomic operations)
123 if (state !is OpDescriptor) return state
124 state.perform(this)
125 }
126 }
127
128 /**
129 * @suppress **This is unstable API and it is subject to change.**
130 */
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300131 private inline fun loopOnState(block: (Any?) -> Unit): Nothing {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300132 while (true) {
133 block(state)
134 }
135 }
136
137 public final override val isActive: Boolean get() {
138 val state = this.state
139 return state is Incomplete && state.isActive
140 }
141
142 public final override val isCompleted: Boolean get() = state !is Incomplete
143
144 public final override val isCancelled: Boolean get() {
145 val state = this.state
146 return state is Cancelled || (state is Finishing && state.cancelled != null)
147 }
148
149 // ------------ state update ------------
150
151 /**
152 * Updates current [state] of this job. Returns `false` if current state is not equal to expected.
153 * @suppress **This is unstable API and it is subject to change.**
154 */
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300155 private fun updateState(expect: Incomplete, proposedUpdate: Any?, mode: Int): Boolean {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300156 val update = coerceProposedUpdate(expect, proposedUpdate)
157 if (!tryUpdateState(expect, update)) return false
158 completeUpdateState(expect, update, mode)
159 return true
160 }
161
162 // when Job is in Cancelling state, it can only be promoted to Cancelled state,
163 // so if the proposed Update is not an appropriate Cancelled (preserving the cancellation cause),
164 // then the corresponding Cancelled state is constructed.
165 private fun coerceProposedUpdate(expect: Incomplete, proposedUpdate: Any?): Any? =
166 if (expect is Finishing && expect.cancelled != null && !isCorrespondinglyCancelled(expect.cancelled, proposedUpdate))
167 createCancelled(expect.cancelled, proposedUpdate) else proposedUpdate
168
169 private fun isCorrespondinglyCancelled(cancelled: Cancelled, proposedUpdate: Any?): Boolean {
170 if (proposedUpdate !is Cancelled) return false
171 // NOTE: equality comparison of causes is performed here by design, see equals of JobCancellationException
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300172 return proposedUpdate.cause == cancelled.cause || proposedUpdate.cause is JobCancellationException
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300173 }
174
175 private fun createCancelled(cancelled: Cancelled, proposedUpdate: Any?): Cancelled {
176 if (proposedUpdate !is CompletedExceptionally) return cancelled // not exception -- just use original cancelled
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300177 val exception = proposedUpdate.cause
178 if (cancelled.cause == exception) return cancelled // that is the cancelled we need already!
179 // todo: We need to rework this logic to keep original cancellation cause in the state and suppress other exceptions
180 // that could have occurred while coroutine is being cancelled.
181 // Do not spam with JCE in suppressed exceptions
182 if (cancelled.cause !is JobCancellationException) {
183 exception.addSuppressedThrowable(cancelled.cause)
184 }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300185 return Cancelled(this, exception)
186 }
187
188 /**
189 * Tries to initiate update of the current [state] of this job.
190 * @suppress **This is unstable API and it is subject to change.**
191 */
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300192 private fun tryUpdateState(expect: Incomplete, update: Any?): Boolean {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300193 require(update !is Incomplete) // only incomplete -> completed transition is allowed
194 if (!_state.compareAndSet(expect, update)) return false
195 // Unregister from parent job
196 parentHandle?.let {
197 it.dispose() // volatile read parentHandle _after_ state was updated
198 parentHandle = NonDisposableHandle // release it just in case, to aid GC
199 }
200 return true // continues in completeUpdateState
201 }
202
203 /**
204 * Completes update of the current [state] of this job.
205 * @suppress **This is unstable API and it is subject to change.**
206 */
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300207 private fun completeUpdateState(expect: Incomplete, update: Any?, mode: Int) {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300208 val exceptionally = update as? CompletedExceptionally
209 // Do overridable processing before completion handlers
210 if (!expect.isCancelling) onCancellationInternal(exceptionally) // only notify when was not cancelling before
211 onCompletionInternal(update, mode)
212 // Invoke completion handlers
213 val cause = exceptionally?.cause
214 if (expect is JobNode<*>) { // SINGLE/SINGLE+ state -- one completion handler (common case)
215 try {
216 expect.invoke(cause)
217 } catch (ex: Throwable) {
218 handleException(CompletionHandlerException("Exception in completion handler $expect for $this", ex))
219 }
220 } else {
221 expect.list?.notifyCompletion(cause)
222 }
223 }
224
225 private inline fun <reified T: JobNode<*>> notifyHandlers(list: NodeList, cause: Throwable?) {
226 var exception: Throwable? = null
227 list.forEach<T> { node ->
228 try {
229 node.invoke(cause)
230 } catch (ex: Throwable) {
231 exception?.apply { addSuppressedThrowable(ex) } ?: run {
232 exception = CompletionHandlerException("Exception in completion handler $node for $this", ex)
233 }
234 }
235 }
236 exception?.let { handleException(it) }
237 }
238
239 private fun NodeList.notifyCompletion(cause: Throwable?) =
240 notifyHandlers<JobNode<*>>(this, cause)
241
242 private fun notifyCancellation(list: NodeList, cause: Throwable?) =
243 notifyHandlers<JobCancellationNode<*>>(list, cause)
244
245 public final override fun start(): Boolean {
246 loopOnState { state ->
247 when (startInternal(state)) {
248 FALSE -> return false
249 TRUE -> return true
250 }
251 }
252 }
253
254 // returns: RETRY/FALSE/TRUE:
255 // FALSE when not new,
256 // TRUE when started
257 // RETRY when need to retry
258 private fun startInternal(state: Any?): Int {
259 when (state) {
260 is Empty -> { // EMPTY_X state -- no completion handlers
261 if (state.isActive) return FALSE // already active
262 if (!_state.compareAndSet(state, EmptyActive)) return RETRY
263 onStartInternal()
264 return TRUE
265 }
266 is NodeList -> { // LIST -- a list of completion handlers (either new or active)
267 return state.tryMakeActive().also { result ->
268 if (result == TRUE) onStartInternal()
269 }
270 }
271 else -> return FALSE // not a new state
272 }
273 }
274
275 /**
276 * Override to provide the actual [start] action.
277 * This function is invoked exactly once when non-active coroutine is [started][start].
278 */
279 internal open fun onStartInternal() {}
280
281 public final override fun getCancellationException(): CancellationException {
282 val state = this.state
283 return when {
284 state is Finishing && state.cancelled != null ->
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300285 state.cancelled.cause.toCancellationException("Job is being cancelled")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300286 state is Incomplete ->
287 error("Job was not completed or cancelled yet: $this")
288 state is CompletedExceptionally ->
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300289 state.cause.toCancellationException("Job has failed")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300290 else -> JobCancellationException("Job has completed normally", null, this)
291 }
292 }
293
294 private fun Throwable.toCancellationException(message: String): CancellationException =
295 this as? CancellationException ?: JobCancellationException(message, this, this@JobSupport)
296
297 /**
298 * Returns the cause that signals the completion of this job -- it returns the original
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300299 * [cancel] cause, [JobCancellationException] or **`null` if this job had completed normally**.
300 * This function throws [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300301 * [isCancelled] yet.
302 */
303 protected fun getCompletionCause(): Throwable? {
304 val state = this.state
305 return when {
306 state is Finishing && state.cancelled != null -> state.cancelled.cause
307 state is Incomplete -> error("Job was not completed or cancelled yet")
308 state is CompletedExceptionally -> state.cause
309 else -> null
310 }
311 }
312
313 @Suppress("OverridingDeprecatedMember")
314 public final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle =
315 invokeOnCompletion(onCancelling = false, invokeImmediately = true, handler = handler)
316
317 @Suppress("OverridingDeprecatedMember")
318 public final override fun invokeOnCompletion(handler: CompletionHandler, onCancelling: Boolean): DisposableHandle =
319 invokeOnCompletion(onCancelling = onCancelling, invokeImmediately = true, handler = handler)
320
321 @Suppress("OverridingDeprecatedMember")
322 public final override fun invokeOnCompletion(onCancelling_: Boolean, handler: CompletionHandler): DisposableHandle =
323 invokeOnCompletion(onCancelling = onCancelling_, invokeImmediately = true, handler = handler)
324
325 // todo: non-final as a workaround for KT-21968, should be final in the future
326 public override fun invokeOnCompletion(
327 onCancelling: Boolean,
328 invokeImmediately: Boolean,
329 handler: CompletionHandler
330 ): DisposableHandle {
331 var nodeCache: JobNode<*>? = null
332 loopOnState { state ->
333 when (state) {
334 is Empty -> { // EMPTY_X state -- no completion handlers
335 if (state.isActive) {
336 // try move to SINGLE state
337 val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
338 if (_state.compareAndSet(state, node)) return node
339 } else
340 promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
341 }
342 is Incomplete -> {
343 val list = state.list
344 if (list == null) { // SINGLE/SINGLE+
345 promoteSingleToNodeList(state as JobNode<*>)
346 } else {
347 if (state is Finishing && state.cancelled != null && onCancelling) {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300348 // installing cancellation handler on job that is being cancelled
349 if (invokeImmediately) handler(state.cancelled.cause)
350 return NonDisposableHandle
351 }
352 val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
353 if (addLastAtomic(state, list, node)) return node
354 }
355 }
356 else -> { // is complete
357 // :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
358 // because we play type tricks on Kotlin/JS and handler is not necessarily a function there
359 if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause)
360 return NonDisposableHandle
361 }
362 }
363 }
364 }
365
366 private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode<*> {
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300367 return if (onCancelling)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300368 (handler as? JobCancellationNode<*>)?.also { require(it.job === this) }
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300369 ?: InvokeOnCancellation(this, handler)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300370 else
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300371 (handler as? JobNode<*>)?.also { require(it.job === this && it !is JobCancellationNode) }
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300372 ?: InvokeOnCompletion(this, handler)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300373 }
374
375 private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode<*>) =
376 list.addLastIf(node) { this.state === expect }
377
378 private fun promoteEmptyToNodeList(state: Empty) {
379 // try to promote it to list in new state
380 _state.compareAndSet(state, NodeList(state.isActive))
381 }
382
383 private fun promoteSingleToNodeList(state: JobNode<*>) {
384 // try to promote it to list (SINGLE+ state)
385 state.addOneIfEmpty(NodeList(active = true))
386 // it must be in SINGLE+ state or state has changed (node could have need removed from state)
387 val list = state.nextNode // either our NodeList or somebody else won the race, updated state
388 // just attempt converting it to list if state is still the same, then we'll continue lock-free loop
389 _state.compareAndSet(state, list)
390 }
391
392 public final override suspend fun join() {
393 if (!joinInternal()) { // fast-path no wait
394 return suspendCoroutineOrReturn { cont ->
395 cont.context.checkCompletion()
396 Unit // do not suspend
397 }
398 }
399 return joinSuspend() // slow-path wait
400 }
401
402 private fun joinInternal(): Boolean {
403 loopOnState { state ->
404 if (state !is Incomplete) return false // not active anymore (complete) -- no need to wait
405 if (startInternal(state) >= 0) return true // wait unless need to retry
406 }
407 }
408
409 private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300410 // We have to invoke join() handler only on cancellation, on completion we will be resumed regularly without handlers
411 cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(this, cont).asHandler))
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300412 }
413
414 public final override val onJoin: SelectClause0
415 get() = this
416
417 // registerSelectJoin
418 public final override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) {
419 // fast-path -- check state and select/return if needed
420 loopOnState { state ->
421 if (select.isSelected) return
422 if (state !is Incomplete) {
423 // already complete -- select result
424 if (select.trySelect(null)) {
425 select.completion.context.checkCompletion() // always check for our completion
426 block.startCoroutineUndispatched(select.completion)
427 }
428 return
429 }
430 if (startInternal(state) == 0) {
431 // slow-path -- register waiter for completion
432 select.disposeOnSelect(invokeOnCompletion(handler = SelectJoinOnCompletion(this, select, block).asHandler))
433 return
434 }
435 }
436 }
437
438 /**
439 * @suppress **This is unstable API and it is subject to change.**
440 */
441 internal fun removeNode(node: JobNode<*>) {
442 // remove logic depends on the state of the job
443 loopOnState { state ->
444 when (state) {
445 is JobNode<*> -> { // SINGE/SINGLE+ state -- one completion handler
446 if (state !== node) return // a different job node --> we were already removed
447 // try remove and revert back to empty state
448 if (_state.compareAndSet(state, EmptyActive)) return
449 }
450 is Incomplete -> { // may have a list of completion handlers
451 // remove node from the list if there is a list
452 if (state.list != null) node.remove()
453 return
454 }
455 else -> return // it is complete and does not have any completion handlers
456 }
457 }
458 }
459
460 /**
461 * @suppress **This is unstable API and it is subject to change.**
462 */
463 internal open val onCancelMode: Int get() = ON_CANCEL_MAKE_CANCELLING
464
465 public override fun cancel(cause: Throwable?): Boolean = when (onCancelMode) {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300466 ON_CANCEL_MAKE_CANCELLING -> makeCancelling(cause)
467 ON_CANCEL_MAKE_COMPLETING -> makeCompletingOnCancel(cause)
468 else -> error("Invalid onCancelMode $onCancelMode")
469 }
470
471 // we will be dispatching coroutine to process its cancellation exception, so there is no need for
472 // an extra check for Job status in MODE_CANCELLABLE
473 private fun updateStateCancelled(state: Incomplete, cause: Throwable?) =
474 updateState(state, Cancelled(this, cause), mode = MODE_ATOMIC_DEFAULT)
475
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300476 // transitions to Cancelling state
477 private fun makeCancelling(cause: Throwable?): Boolean {
478 loopOnState { state ->
479 when (state) {
480 is Empty -> { // EMPTY_X state -- no completion handlers
481 if (state.isActive) {
482 promoteEmptyToNodeList(state) // this way can wrap it into Cancelling on next pass
483 } else {
484 // cancelling a non-started coroutine makes it immediately cancelled
485 // (and we have no listeners to notify which makes it very simple)
486 if (updateStateCancelled(state, cause)) return true
487 }
488 }
489 is JobNode<*> -> { // SINGLE/SINGLE+ state -- one completion handler
490 promoteSingleToNodeList(state)
491 }
492 is NodeList -> { // LIST -- a list of completion handlers (either new or active)
493 if (state.isActive) {
494 if (tryMakeCancelling(state, state.list, cause)) return true
495 } else {
496 // cancelling a non-started coroutine makes it immediately cancelled
497 if (updateStateCancelled(state, cause))
498 return true
499 }
500 }
501 is Finishing -> { // Completing/Cancelling the job, may cancel
502 if (state.cancelled != null) return false // already cancelling
503 if (tryMakeCancelling(state, state.list, cause)) return true
504 }
505 else -> { // is inactive
506 return false
507 }
508 }
509 }
510 }
511
512 // try make expected state in cancelling on the condition that we're still in this state
513 private fun tryMakeCancelling(expect: Incomplete, list: NodeList, cause: Throwable?): Boolean {
514 val cancelled = Cancelled(this, cause)
515 if (!_state.compareAndSet(expect, Finishing(list, cancelled, false))) return false
516 onFinishingInternal(cancelled)
517 onCancellationInternal(cancelled)
518 notifyCancellation(list, cause)
519 return true
520 }
521
522 private fun makeCompletingOnCancel(cause: Throwable?): Boolean =
523 makeCompleting(Cancelled(this, cause))
524
525 /**
526 * @suppress **This is unstable API and it is subject to change.**
527 */
528 internal fun makeCompleting(proposedUpdate: Any?): Boolean =
529 when (makeCompletingInternal(proposedUpdate, mode = MODE_ATOMIC_DEFAULT)) {
530 COMPLETING_ALREADY_COMPLETING -> false
531 else -> true
532 }
533
534 /**
535 * Returns:
536 * * `true` if state was updated to completed/cancelled;
537 * * `false` if made completing or it is cancelling and is waiting for children.
538 *
539 * @throws IllegalStateException if job is already complete or completing
540 * @suppress **This is unstable API and it is subject to change.**
541 */
542 internal fun makeCompletingOnce(proposedUpdate: Any?, mode: Int): Boolean =
543 when (makeCompletingInternal(proposedUpdate, mode)) {
544 COMPLETING_COMPLETED -> true
545 COMPLETING_WAITING_CHILDREN -> false
546 else -> throw IllegalStateException("Job $this is already complete or completing, " +
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300547 "but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300548 }
549
550 private fun makeCompletingInternal(proposedUpdate: Any?, mode: Int): Int {
551 loopOnState { state ->
552 if (state !is Incomplete)
553 return COMPLETING_ALREADY_COMPLETING
554 if (state is Finishing && state.completing)
555 return COMPLETING_ALREADY_COMPLETING
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300556 val child: ChildJob? = firstChild(state) ?: // or else complete immediately w/o children
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300557 when {
558 state !is Finishing && hasOnFinishingHandler(proposedUpdate) -> null // unless it has onFinishing handler
559 updateState(state, proposedUpdate, mode) -> return COMPLETING_COMPLETED
560 else -> return@loopOnState
561 }
562 val list = state.list ?: // must promote to list to correctly operate on child lists
563 when (state) {
564 is Empty -> {
565 promoteEmptyToNodeList(state)
566 return@loopOnState // retry
567 }
568 is JobNode<*> -> {
569 promoteSingleToNodeList(state)
570 return@loopOnState // retry
571 }
572 else -> error("Unexpected state with an empty list: $state")
573 }
574 // cancel all children in list on exceptional completion
575 if (proposedUpdate is CompletedExceptionally)
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300576 child?.cancelChildrenInternal(proposedUpdate.cause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300577 // switch to completing state
578 val cancelled = (state as? Finishing)?.cancelled ?: (proposedUpdate as? Cancelled)
579 val completing = Finishing(list, cancelled, true)
580 if (_state.compareAndSet(state, completing)) {
581 if (state !is Finishing) onFinishingInternal(proposedUpdate)
582 if (child != null && tryWaitForChild(child, proposedUpdate))
583 return COMPLETING_WAITING_CHILDREN
584 if (updateState(completing, proposedUpdate, mode = MODE_ATOMIC_DEFAULT))
585 return COMPLETING_COMPLETED
586 }
587 }
588 }
589
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300590 private tailrec fun ChildJob.cancelChildrenInternal(cause: Throwable) {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300591 childJob.cancel(JobCancellationException("Child job was cancelled because of parent failure", cause, childJob))
592 nextChild()?.cancelChildrenInternal(cause)
593 }
594
595 private val Any?.exceptionOrNull: Throwable?
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300596 get() = (this as? CompletedExceptionally)?.cause
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300597
598 private fun firstChild(state: Incomplete) =
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300599 state as? ChildJob ?: state.list?.nextChild()
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300600
601 // return false when there is no more incomplete children to wait
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300602 private tailrec fun tryWaitForChild(child: ChildJob, proposedUpdate: Any?): Boolean {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300603 val handle = child.childJob.invokeOnCompletion(invokeImmediately = false,
604 handler = ChildCompletion(this, child, proposedUpdate).asHandler)
605 if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
606 val nextChild = child.nextChild() ?: return false
607 return tryWaitForChild(nextChild, proposedUpdate)
608 }
609
610 /**
611 * @suppress **This is unstable API and it is subject to change.**
612 */
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300613 internal fun continueCompleting(lastChild: ChildJob, proposedUpdate: Any?) {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300614 loopOnState { state ->
615 if (state !is Finishing)
616 throw IllegalStateException("Job $this is found in expected state while completing with $proposedUpdate", proposedUpdate.exceptionOrNull)
617 // figure out if we need to wait for next child
618 val waitChild = lastChild.nextChild()
619 // try wait for next child
620 if (waitChild != null && tryWaitForChild(waitChild, proposedUpdate)) return // waiting for next child
621 // no more children to wait -- try update state
622 if (updateState(state, proposedUpdate, MODE_ATOMIC_DEFAULT)) return
623 }
624 }
625
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300626 private fun LockFreeLinkedListNode.nextChild(): ChildJob? {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300627 var cur = this
628 while (cur.isRemoved) cur = cur.prevNode // rollback to prev non-removed (or list head)
629 while (true) {
630 cur = cur.nextNode
631 if (cur.isRemoved) continue
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300632 if (cur is ChildJob) return cur
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300633 if (cur is NodeList) return null // checked all -- no more children
634 }
635 }
636
637 public final override val children: Sequence<Job> get() = buildSequence {
638 val state = this@JobSupport.state
639 when (state) {
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300640 is ChildJob -> yield(state.childJob)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300641 is Incomplete -> state.list?.let { list ->
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300642 list.forEach<ChildJob> { yield(it.childJob) }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300643 }
644 }
645 }
646
647 @Suppress("OverridingDeprecatedMember")
648 public final override fun attachChild(child: Job): DisposableHandle =
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300649 invokeOnCompletion(onCancelling = true, handler = ChildJob(this, child).asHandler)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300650
651 @Suppress("OverridingDeprecatedMember")
652 public final override fun cancelChildren(cause: Throwable?) {
653 this.cancelChildren(cause) // use extension function
654 }
655
656 /**
657 * Override to process any exceptions that were encountered while invoking completion handlers
658 * installed via [invokeOnCompletion].
659 * @suppress **This is unstable API and it is subject to change.**
660 */
661 internal open fun handleException(exception: Throwable) {
662 throw exception
663 }
664
665 /**
666 * This function is invoked once when job is cancelled or is completed, similarly to [invokeOnCompletion] with
667 * `onCancelling` set to `true`.
668 * @param exceptionally not null when the the job was cancelled or completed exceptionally,
669 * null when it has completed normally.
670 * @suppress **This is unstable API and it is subject to change.**
671 */
672 internal open fun onCancellationInternal(exceptionally: CompletedExceptionally?) {}
673
674 /**
675 * @suppress **This is unstable API and it is subject to change.**
676 */
677 internal open fun hasOnFinishingHandler(update: Any?) = false
678
679 /**
680 * @suppress **This is unstable API and it is subject to change.**
681 */
682 internal open fun onFinishingInternal(update: Any?) {}
683
684 /**
685 * Override for post-completion actions that need to do something with the state.
686 * @param mode completion mode.
687 * @suppress **This is unstable API and it is subject to change.**
688 */
689 internal open fun onCompletionInternal(state: Any?, mode: Int) {}
690
691 // for nicer debugging
692 public override fun toString(): String =
693 "${nameString()}{${stateString()}}@$hexAddress"
694
695 /**
696 * @suppress **This is unstable API and it is subject to change.**
697 */
698 internal open fun nameString(): String = classSimpleName
699
700 private fun stateString(): String {
701 val state = this.state
702 return when (state) {
703 is Finishing -> buildString {
704 if (state.cancelled != null) append("Cancelling")
705 if (state.completing) append("Completing")
706 }
707 is Incomplete -> if (state.isActive) "Active" else "New"
708 is Cancelled -> "Cancelled"
709 is CompletedExceptionally -> "CompletedExceptionally"
710 else -> "Completed"
711 }
712 }
713
714 // Cancelling or Completing
715 private class Finishing(
716 override val list: NodeList,
717 @JvmField val cancelled: Cancelled?, /* != null when cancelling */
718 @JvmField val completing: Boolean /* true when completing */
719 ) : Incomplete {
720 override val isActive: Boolean get() = cancelled == null
721 }
722
723 private val Incomplete.isCancelling: Boolean
724 get() = this is Finishing && cancelled != null
725
726 /*
727 * =================================================================================================
728 * This is ready-to-use implementation for Deferred interface.
729 * However, it is not type-safe. Conceptually it just exposes the value of the underlying
730 * completed state as `Any?`
731 * =================================================================================================
732 */
733
734 public val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally
735
736 public fun getCompletionExceptionOrNull(): Throwable? {
737 val state = this.state
738 check(state !is Incomplete) { "This job has not completed yet" }
739 return state.exceptionOrNull
740 }
741
742 /**
743 * @suppress **This is unstable API and it is subject to change.**
744 */
745 internal fun getCompletedInternal(): Any? {
746 val state = this.state
747 check(state !is Incomplete) { "This job has not completed yet" }
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300748 if (state is CompletedExceptionally) throw state.cause
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300749 return state
750 }
751
752 /**
753 * @suppress **This is unstable API and it is subject to change.**
754 */
755 internal suspend fun awaitInternal(): Any? {
756 // fast-path -- check state (avoid extra object creation)
757 while(true) { // lock-free loop on state
758 val state = this.state
759 if (state !is Incomplete) {
760 // already complete -- just return result
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300761 if (state is CompletedExceptionally) throw state.cause
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300762 return state
763
764 }
765 if (startInternal(state) >= 0) break // break unless needs to retry
766 }
767 return awaitSuspend() // slow-path
768 }
769
770 private suspend fun awaitSuspend(): Any? = suspendCancellableCoroutine { cont ->
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300771 // We have to invoke await() handler only on cancellation, on completion we will be resumed regularly without handlers
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300772 cont.disposeOnCancellation(invokeOnCompletion {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300773 val state = this.state
774 check(state !is Incomplete)
775 if (state is CompletedExceptionally)
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300776 cont.resumeWithException(state.cause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300777 else
778 cont.resume(state)
779 })
780 }
781
782 /**
783 * @suppress **This is unstable API and it is subject to change.**
784 */
785 // registerSelectAwaitInternal
786 @Suppress("UNCHECKED_CAST")
787 internal fun <T, R> registerSelectClause1Internal(select: SelectInstance<R>, block: suspend (T) -> R) {
788 // fast-path -- check state and select/return if needed
789 loopOnState { state ->
790 if (select.isSelected) return
791 if (state !is Incomplete) {
792 // already complete -- select result
793 if (select.trySelect(null)) {
794 if (state is CompletedExceptionally)
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300795 select.resumeSelectCancellableWithException(state.cause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300796 else
797 block.startCoroutineUndispatched(state as T, select.completion)
798 }
799 return
800 }
801 if (startInternal(state) == 0) {
802 // slow-path -- register waiter for completion
803 select.disposeOnSelect(invokeOnCompletion(handler = SelectAwaitOnCompletion(this, select, block).asHandler))
804 return
805 }
806 }
807 }
808
809 /**
810 * @suppress **This is unstable API and it is subject to change.**
811 */
812 @Suppress("UNCHECKED_CAST")
813 internal fun <T, R> selectAwaitCompletion(select: SelectInstance<R>, block: suspend (T) -> R) {
814 val state = this.state
815 // Note: await is non-atomic (can be cancelled while dispatched)
816 if (state is CompletedExceptionally)
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300817 select.resumeSelectCancellableWithException(state.cause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300818 else
819 block.startCoroutineCancellable(state as T, select.completion)
820 }
821}
822
823// --------------- helper classes to simplify job implementation
824
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300825internal const val ON_CANCEL_MAKE_CANCELLING = 0
826internal const val ON_CANCEL_MAKE_COMPLETING = 1
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300827
828private const val COMPLETING_ALREADY_COMPLETING = 0
829private const val COMPLETING_COMPLETED = 1
830private const val COMPLETING_WAITING_CHILDREN = 2
831
832private const val RETRY = -1
833private const val FALSE = 0
834private const val TRUE = 1
835
836@Suppress("PrivatePropertyName")
837private val EmptyNew = Empty(false)
838@Suppress("PrivatePropertyName")
839private val EmptyActive = Empty(true)
840
841private class Empty(override val isActive: Boolean) : Incomplete {
842 override val list: NodeList? get() = null
843 override fun toString(): String = "Empty{${if (isActive) "Active" else "New" }}"
844}
845
846internal class JobImpl(parent: Job? = null) : JobSupport(true) {
847 init { initParentJobInternal(parent) }
848 override val onCancelMode: Int get() = ON_CANCEL_MAKE_COMPLETING
849}
850
851// -------- invokeOnCompletion nodes
852
853internal interface Incomplete {
854 val isActive: Boolean
855 val list: NodeList? // is null only for Empty and JobNode incomplete state objects
856}
857
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300858internal abstract class JobNode<out J : Job>(
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300859 @JvmField val job: J
Roman Elizarovdbd9e1c2018-04-28 15:14:18 +0300860) : CompletionHandlerBase(), DisposableHandle, Incomplete {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300861 override val isActive: Boolean get() = true
862 override val list: NodeList? get() = null
863 override fun dispose() = (job as JobSupport).removeNode(this)
864}
865
866internal class NodeList(
867 active: Boolean
868) : LockFreeLinkedListHead(), Incomplete {
869 private val _active = atomic(if (active) 1 else 0)
870
871 override val isActive: Boolean get() = _active.value != 0
872 override val list: NodeList get() = this
873
874 fun tryMakeActive(): Int {
875 if (_active.value != 0) return FALSE
876 if (_active.compareAndSet(0, 1)) return TRUE
877 return RETRY
878 }
879
880 override fun toString(): String = buildString {
881 append("List")
882 append(if (isActive) "{Active}" else "{New}")
883 append("[")
884 var first = true
885 this@NodeList.forEach<JobNode<*>> { node ->
886 if (first) first = false else append(", ")
887 append(node)
888 }
889 append("]")
890 }
891}
892
893private class InvokeOnCompletion(
894 job: Job,
895 private val handler: CompletionHandler
896) : JobNode<Job>(job) {
897 override fun invoke(cause: Throwable?) = handler.invoke(cause)
898 override fun toString() = "InvokeOnCompletion[$classSimpleName@$hexAddress]"
899}
900
901private class ResumeOnCompletion(
902 job: Job,
903 private val continuation: Continuation<Unit>
904) : JobNode<Job>(job) {
905 override fun invoke(cause: Throwable?) = continuation.resume(Unit)
906 override fun toString() = "ResumeOnCompletion[$continuation]"
907}
908
909internal class DisposeOnCompletion(
910 job: Job,
911 private val handle: DisposableHandle
912) : JobNode<Job>(job) {
913 override fun invoke(cause: Throwable?) = handle.dispose()
914 override fun toString(): String = "DisposeOnCompletion[$handle]"
915}
916
917private class SelectJoinOnCompletion<R>(
918 job: JobSupport,
919 private val select: SelectInstance<R>,
920 private val block: suspend () -> R
921) : JobNode<JobSupport>(job) {
922 override fun invoke(cause: Throwable?) {
923 if (select.trySelect(null))
924 block.startCoroutineCancellable(select.completion)
925 }
926 override fun toString(): String = "SelectJoinOnCompletion[$select]"
927}
928
929private class SelectAwaitOnCompletion<T, R>(
930 job: JobSupport,
931 private val select: SelectInstance<R>,
932 private val block: suspend (T) -> R
933) : JobNode<JobSupport>(job) {
934 override fun invoke(cause: Throwable?) {
935 if (select.trySelect(null))
936 job.selectAwaitCompletion(select, block)
937 }
938 override fun toString(): String = "SelectAwaitOnCompletion[$select]"
939}
940
941// -------- invokeOnCancellation nodes
942
943/**
944 * Marker for node that shall be invoked on cancellation (in _cancelling_ state).
945 * **Note: may be invoked multiple times during cancellation.**
946 */
947internal abstract class JobCancellationNode<out J : Job>(job: J) : JobNode<J>(job)
948
949private class InvokeOnCancellation(
950 job: Job,
951 private val handler: CompletionHandler
952) : JobCancellationNode<Job>(job) {
953 // delegate handler shall be invoked at most once, so here is an additional flag
954 private val _invoked = atomic(0)
955 override fun invoke(cause: Throwable?) {
956 if (_invoked.compareAndSet(0, 1)) handler.invoke(cause)
957 }
958 override fun toString() = "InvokeOnCancellation[$classSimpleName@$hexAddress]"
959}
960
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300961internal class ChildJob(
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300962 parent: JobSupport,
963 @JvmField val childJob: Job
964) : JobCancellationNode<JobSupport>(parent) {
965 override fun invoke(cause: Throwable?) {
966 // Always materialize the actual instance of parent's completion exception and cancel child with it
967 childJob.cancel(job.getCancellationException())
968 }
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300969 override fun toString(): String = "ChildJob[$childJob]"
970}
971
972// Same as ChildJob, but for cancellable continuation
973internal class ChildContinuation(
974 parent: Job,
975 @JvmField val child: AbstractContinuation<*>
976) : JobCancellationNode<Job>(parent) {
977 override fun invoke(cause: Throwable?) {
978 child.cancel(job.getCancellationException())
979 }
980 override fun toString(): String = "ChildContinuation[$child]"
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300981}
982
983private class ChildCompletion(
984 private val parent: JobSupport,
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300985 private val child: ChildJob,
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300986 private val proposedUpdate: Any?
987) : JobNode<Job>(child.childJob) {
988 override fun invoke(cause: Throwable?) {
989 parent.continueCompleting(child, proposedUpdate)
990 }
991}