blob: 63bc6bb4fb49cb07110e4a39a57fc3107e8ac3a1 [file] [log] [blame]
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03001/*
2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03005package kotlinx.coroutines.experimental
6
7import kotlinx.atomicfu.*
8import kotlinx.coroutines.experimental.internal.*
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03009import kotlinx.coroutines.experimental.intrinsics.*
10import kotlinx.coroutines.experimental.selects.*
11import kotlin.coroutines.experimental.*
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +030012
13/**
14 * A concrete implementation of [Job]. It is optionally a child to a parent job.
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +030015 *
16 * This is an open class designed for extension by more specific classes that might augment the
17 * state and mare store addition state information for completed jobs, like their result values.
18 *
19 * @param active when `true` the job is created in _active_ state, when `false` in _new_ state. See [Job] for details.
20 * @suppress **This is unstable API and it is subject to change.**
21 */
Roman Elizarovf7a63342018-09-25 00:26:25 +030022internal open class JobSupport constructor(active: Boolean) : Job, ChildJob, SelectClause0 {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +030023 final override val key: CoroutineContext.Key<*> get() = Job
24
25 /*
26 === Internal states ===
27
Roman Elizarovecbc85c2018-09-14 12:52:50 +030028 name state class public state description
29 ------ ------------ ------------ -----------
30 EMPTY_N EmptyNew : New no listeners
31 EMPTY_A EmptyActive : Active no listeners
32 SINGLE JobNode : Active a single listener
33 SINGLE+ JobNode : Active a single listener + NodeList added as its next
34 LIST_N InactiveNodeList : New a list of listeners (promoted once, does not got back to EmptyNew)
35 LIST_A NodeList : Active a list of listeners (promoted once, does not got back to JobNode/EmptyActive)
36 COMPLETING Finishing : Completing has a list of listeners (promoted once from LIST_*)
37 CANCELLING Finishing : Cancelling -- " --
Roman Elizarov6685fd02018-09-25 13:23:53 +030038 FINAL_C Cancelled : Cancelled Cancelled (final state)
Roman Elizarovecbc85c2018-09-14 12:52:50 +030039 FINAL_R <any> : Completed produced some result
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +030040
41 === Transitions ===
42
43 New states Active states Inactive states
44
45 +---------+ +---------+ }
Roman Elizarovecbc85c2018-09-14 12:52:50 +030046 | EMPTY_N | ----> | EMPTY_A | ----+ } Empty states
47 +---------+ +---------+ | }
48 | | | ^ | +----------+
49 | | | | +--> | FINAL_* |
50 | | V | | +----------+
51 | | +---------+ | }
52 | | | SINGLE | ----+ } JobNode states
53 | | +---------+ | }
54 | | | | }
55 | | V | }
56 | | +---------+ | }
57 | +-------> | SINGLE+ | ----+ }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +030058 | +---------+ | }
59 | | |
60 V V |
61 +---------+ +---------+ | }
Roman Elizarovecbc85c2018-09-14 12:52:50 +030062 | LIST_N | ----> | LIST_A | ----+ } [Inactive]NodeList states
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +030063 +---------+ +---------+ | }
64 | | | | |
65 | | +--------+ | |
66 | | | V |
67 | | | +------------+ | +------------+ }
68 | +-------> | COMPLETING | --+-- | CANCELLING | } Finishing states
69 | | +------------+ +------------+ }
70 | | | ^
71 | | | |
72 +--------+---------+--------------------+
73
74
75 This state machine and its transition matrix are optimized for the common case when job is created in active
Roman Elizarovecbc85c2018-09-14 12:52:50 +030076 state (EMPTY_A), at most one completion listener is added to it during its life-time, and it completes
77 successfully without children (in this case it directly goes from EMPTY_A or SINGLE state to FINAL_R
78 state without going to COMPLETING state)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +030079
80 Note, that the actual `_state` variable can also be a reference to atomic operation descriptor `OpDescriptor`
Roman Elizarovff0aab82018-09-22 17:11:53 +030081
82 ---------- TIMELINE of state changes and notification in Job lifecycle ----------
83
84 | The longest possible chain of events in shown, shorter versions cut-through intermediate states,
85 | while still performing all the notifications in this order.
86
Roman Elizarov6685fd02018-09-25 13:23:53 +030087 + Job object is created
Roman Elizarovff0aab82018-09-22 17:11:53 +030088 ## NEW: state == EMPTY_ACTIVE | is InactiveNodeList
Roman Elizarov6685fd02018-09-25 13:23:53 +030089 + initParentJob / initParentJobInternal (invokes attachChild on its parent, initializes parentHandle)
90 ~ waits for start
91 >> start / join / await invoked
Roman Elizarovff0aab82018-09-22 17:11:53 +030092 ## ACTIVE: state == EMPTY_ACTIVE | is JobNode | is NodeList
Roman Elizarov6685fd02018-09-25 13:23:53 +030093 + onStartInternal / onStart (lazy coroutine is started)
94 ~ active coroutine is working (or scheduled to execution)
95 >> childCancelled / cancelImpl invoked
96 ## CANCELLING: state is Finishing, state.rootCause != null
97 ------ cancelling listeners are not admitted anymore, invokeOnCompletion(onCancellation=true) returns NonDisposableHandle
98 ------ new children get immediately cancelled, but are still admitted to the list
99 + onCancellation
100 + notifyCancelling (invoke all cancelling listeners -- cancel all children, suspended functions resume with exception)
101 + cancelParent (rootCause of cancellation is communicated to the parent, parent is cancelled, too)
Vsevolod Tolstopyatovf157cec2018-09-24 17:22:06 +0300102 ~ waits for completion of coroutine body
103 >> makeCompleting / makeCompletingOnce invoked
Roman Elizarovff0aab82018-09-22 17:11:53 +0300104 ## COMPLETING: state is Finishing, state.isCompleting == true
Vsevolod Tolstopyatovf157cec2018-09-24 17:22:06 +0300105 ------ new children are not admitted anymore, attachChild returns NonDisposableHandle
Roman Elizarov6685fd02018-09-25 13:23:53 +0300106 ~ waits for children
107 >> last child completes
108 - computes the final exception
Roman Elizarovff0aab82018-09-22 17:11:53 +0300109 ## SEALED: state is Finishing, state.isSealed == true
Roman Elizarov6685fd02018-09-25 13:23:53 +0300110 ------ cancel/childCancelled returns false (cannot handle exceptions anymore)
111 + cancelParent (final exception is communicated to the parent, parent incorporates it)
Vsevolod Tolstopyatovf157cec2018-09-24 17:22:06 +0300112 + handleJobException ("launch" StandaloneCoroutine invokes CoroutineExceptionHandler)
Roman Elizarov6685fd02018-09-25 13:23:53 +0300113 ## COMPLETE: state !is Incomplete (CompletedExceptionally | Cancelled)
114 ------ completion listeners are not admitted anymore, invokeOnCompletion returns NonDisposableHandle
Vsevolod Tolstopyatovf157cec2018-09-24 17:22:06 +0300115 + parentHandle.dispose
116 + notifyCompletion (invoke all completion listeners)
117 + onCompletionInternal / onCompleted / onCompletedExceptionally
Roman Elizarovff0aab82018-09-22 17:11:53 +0300118
119 ---------------------------------------------------------------------------------
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300120 */
121
122 // Note: use shared objects while we have no listeners
Roman Elizarov563da402018-08-10 19:18:56 +0300123 private val _state = atomic<Any?>(if (active) EMPTY_ACTIVE else EMPTY_NEW)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300124
125 @Volatile
Roman Elizarov5d18d022018-09-22 22:13:05 +0300126 private var parentHandle: ChildHandle? = null
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300127
128 // ------------ initialization ------------
129
130 /**
131 * Initializes parent job.
132 * It shall be invoked at most once after construction after all other initialization.
133 * @suppress **This is unstable API and it is subject to change.**
134 */
135 internal fun initParentJobInternal(parent: Job?) {
136 check(parentHandle == null)
137 if (parent == null) {
138 parentHandle = NonDisposableHandle
139 return
140 }
141 parent.start() // make sure the parent is started
142 @Suppress("DEPRECATION")
143 val handle = parent.attachChild(this)
144 parentHandle = handle
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300145 // now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300146 if (isCompleted) {
147 handle.dispose()
148 parentHandle = NonDisposableHandle // release it just in case, to aid GC
149 }
150 }
151
152 // ------------ state query ------------
153
154 /**
155 * Returns current state of this job.
156 * @suppress **This is unstable API and it is subject to change.**
157 */
158 internal val state: Any? get() {
159 _state.loop { state -> // helper loop on state (complete in-progress atomic operations)
160 if (state !is OpDescriptor) return state
161 state.perform(this)
162 }
163 }
164
165 /**
166 * @suppress **This is unstable API and it is subject to change.**
167 */
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300168 private inline fun loopOnState(block: (Any?) -> Unit): Nothing {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300169 while (true) {
170 block(state)
171 }
172 }
173
Vsevolod Tolstopyatov79414ec2018-08-30 16:50:56 +0300174 public override val isActive: Boolean get() {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300175 val state = this.state
176 return state is Incomplete && state.isActive
177 }
178
179 public final override val isCompleted: Boolean get() = state !is Incomplete
180
181 public final override val isCancelled: Boolean get() {
182 val state = this.state
Roman Elizarov6685fd02018-09-25 13:23:53 +0300183 return state is CompletedExceptionally || (state is Finishing && state.isCancelling)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300184 }
185
186 // ------------ state update ------------
187
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300188 // Finalizes Finishing -> Completed (terminal state) transition.
189 // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
190 private fun tryFinalizeFinishingState(state: Finishing, proposedUpdate: Any?, mode: Int): Boolean {
191 require(proposedUpdate !is Incomplete) // only incomplete -> completed transition is allowed
192 require(this.state === state) // consistency check -- it cannot change
193 require(!state.isSealed) // consistency check -- cannot be sealed yet
194 require(state.isCompleting) // consistency check -- must be marked as completing
195 val proposedException = (proposedUpdate as? CompletedExceptionally)?.cause
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300196 // Create the final exception and seal the state so that no more exceptions can be added
197 var suppressed = false
198 val finalException = synchronized(state) {
199 val exceptions = state.sealLocked(proposedException)
Vsevolod Tolstopyatov1b22af72018-09-25 19:16:02 +0300200 val finalCause = getFinalRootCause(state, exceptions)
201 // Report suppressed exceptions if initial cause doesn't match final cause (due to JCE unwrapping)
202 if (finalCause != null) suppressed = suppressExceptions(finalCause, exceptions) || finalCause !== state.rootCause
203 finalCause
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +0300204 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300205 // Create the final state object
206 val finalState = when {
Roman Elizarov852fae52018-09-25 20:13:24 +0300207 // was not cancelled (no exception) -> use proposed update value
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300208 finalException == null -> proposedUpdate
Roman Elizarov852fae52018-09-25 20:13:24 +0300209 // small optimization when we can used proposeUpdate object as is on cancellation
Roman Elizarov6685fd02018-09-25 13:23:53 +0300210 finalException === proposedException -> proposedUpdate
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300211 // cancelled job final state
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300212 else -> CompletedExceptionally(finalException)
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300213 }
Vsevolod Tolstopyatovf157cec2018-09-24 17:22:06 +0300214 // Now handle exception if parent can't handle it
Roman Elizarov6685fd02018-09-25 13:23:53 +0300215 if (finalException != null && !cancelParent(finalException)) {
Vsevolod Tolstopyatovf157cec2018-09-24 17:22:06 +0300216 handleJobException(finalException)
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300217 }
218 // Then CAS to completed state -> it must succeed
219 require(_state.compareAndSet(state, finalState)) { "Unexpected state: ${_state.value}, expected: $state, update: $finalState" }
220 // And process all post-completion actions
221 completeStateFinalization(state, finalState, mode, suppressed)
222 return true
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300223 }
224
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300225 private fun getFinalRootCause(state: Finishing, exceptions: List<Throwable>): Throwable? {
226 // A case of no exceptions
227 if (exceptions.isEmpty()) {
228 // materialize cancellation exception if it was not materialized yet
229 if (state.isCancelling) return createJobCancellationException()
230 return null
231 }
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300232 /*
233 * This is a place where we step on our API limitation:
234 * We can't distinguish internal JobCancellationException from our parent
235 * from external cancellation, thus we ought to collect all exceptions.
236 *
237 * But it has negative consequences: same exception can be added as suppressed more than once.
238 * Consider concurrent parent-child relationship:
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300239 * 1) Child throws E1 and parent throws E2.
Roman Elizarov852fae52018-09-25 20:13:24 +0300240 * 2) Parent goes to "Cancelling(E1)" and cancels child with E1
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300241 * 3) Child goes to "Cancelling(E1)", but throws an exception E2
Roman Elizarov852fae52018-09-25 20:13:24 +0300242 * 4) When child throws, it notifies parent that it is cancelled, adding its exception to parent's list of exceptions
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300243 * 5) Child builds final exception: E1 with suppressed E2, reports it to parent.
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300244 * 6) Parent aggregates three exceptions: original E1, reported E2 and "final" E1.
245 * It filters the third exception, but adds the second one to the first one, thus adding suppressed duplicate.
246 *
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300247 * Note that it's only happening when both parent and child throw exception simultaneously.
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300248 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300249 var rootCause = exceptions[0]
Roman Elizarov541a9b62018-09-25 23:23:34 +0300250 if (rootCause is CancellationException) {
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300251 val cause = unwrap(rootCause)
252 rootCause = if (cause !== null) {
253 cause
254 } else {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300255 exceptions.firstOrNull { unwrap(it) != null } ?: return rootCause
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300256 }
257 }
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300258 return rootCause
259 }
260
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300261 private fun suppressExceptions(rootCause: Throwable, exceptions: List<Throwable>): Boolean {
262 if (exceptions.size <= 1) return false // nothing more to do here
Vsevolod Tolstopyatovf157cec2018-09-24 17:22:06 +0300263 val seenExceptions = identitySet<Throwable>(exceptions.size)
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300264 var suppressed = false
265 for (i in 1 until exceptions.size) {
266 val unwrapped = unwrap(exceptions[i])
267 if (unwrapped !== null && unwrapped !== rootCause) {
268 if (seenExceptions.add(unwrapped)) {
269 rootCause.addSuppressedThrowable(unwrapped)
270 suppressed = true
271 }
272 }
273 }
274 return suppressed
275 }
276
Roman Elizarov563da402018-08-10 19:18:56 +0300277 private tailrec fun unwrap(exception: Throwable): Throwable? =
Roman Elizarov541a9b62018-09-25 23:23:34 +0300278 if (exception is CancellationException) {
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300279 val cause = exception.cause
Roman Elizarov563da402018-08-10 19:18:56 +0300280 if (cause !== null) unwrap(cause) else null
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300281 } else {
Roman Elizarov563da402018-08-10 19:18:56 +0300282 exception
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300283 }
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300284
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300285 // fast-path method to finalize normally completed coroutines without children
286 private fun tryFinalizeSimpleState(state: Incomplete, update: Any?, mode: Int): Boolean {
Roman Elizarov7e238752018-09-20 15:05:41 +0300287 check(state is Empty || state is JobNode<*>) // only simple state without lists where children can concurrently add
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300288 check(update !is CompletedExceptionally) // only for normal completion
289 if (!_state.compareAndSet(state, update)) return false
290 completeStateFinalization(state, update, mode, false)
Roman Elizarov563da402018-08-10 19:18:56 +0300291 return true
292 }
293
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300294 // suppressed == true when any exceptions were suppressed while building the final completion cause
295 private fun completeStateFinalization(state: Incomplete, update: Any?, mode: Int, suppressed: Boolean) {
Roman Elizarov563da402018-08-10 19:18:56 +0300296 /*
297 * Now the job in THE FINAL state. We need to properly handle the resulting state.
298 * Order of various invocations here is important.
299 *
Vsevolod Tolstopyatovc7581692018-08-13 17:18:01 +0300300 * 1) Unregister from parent job.
Roman Elizarov563da402018-08-10 19:18:56 +0300301 */
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300302 parentHandle?.let {
303 it.dispose() // volatile read parentHandle _after_ state was updated
304 parentHandle = NonDisposableHandle // release it just in case, to aid GC
305 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300306 val cause = (update as? CompletedExceptionally)?.cause
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300307 /*
Roman Elizarov6685fd02018-09-25 13:23:53 +0300308 * 2) Invoke onCancellation: for resource cancellation resource cancellation etc.
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300309 * Only notify is was not notified yet.
Roman Elizarov6685fd02018-09-25 13:23:53 +0300310 * Note: we do not use notifyCancelling here, since we are going to invoke all completion as our next step
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300311 */
Roman Elizarov6685fd02018-09-25 13:23:53 +0300312 if (!state.isCancelling) onCancellation(cause)
Roman Elizarov563da402018-08-10 19:18:56 +0300313 /*
Vsevolod Tolstopyatovc7581692018-08-13 17:18:01 +0300314 * 3) Invoke completion handlers: .join(), callbacks etc.
Roman Elizarov563da402018-08-10 19:18:56 +0300315 * It's important to invoke them only AFTER exception handling, see #208
316 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300317 if (state is JobNode<*>) { // SINGLE/SINGLE+ state -- one completion handler (common case)
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300318 try {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300319 state.invoke(cause)
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300320 } catch (ex: Throwable) {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300321 handleOnCompletionException(CompletionHandlerException("Exception in completion handler $state for $this", ex))
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300322 }
323 } else {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300324 state.list?.notifyCompletion(cause)
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300325 }
Roman Elizarov563da402018-08-10 19:18:56 +0300326 /*
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300327 * 4) Invoke onCompletionInternal: onNext(), timeout de-registration etc.
Roman Elizarov563da402018-08-10 19:18:56 +0300328 * It should be last so all callbacks observe consistent state
329 * of the job which doesn't depend on callback scheduling.
330 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300331 onCompletionInternal(update, mode, suppressed)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300332 }
333
Roman Elizarov6685fd02018-09-25 13:23:53 +0300334 private fun notifyCancelling(list: NodeList, cause: Throwable) {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300335 // first cancel our own children
Roman Elizarov6685fd02018-09-25 13:23:53 +0300336 onCancellation(cause)
337 notifyHandlers<JobCancellingNode<*>>(list, cause)
Roman Elizarov852fae52018-09-25 20:13:24 +0300338 // then cancel parent
339 cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300340 }
341
Vsevolod Tolstopyatov3bda22c2018-07-20 16:14:49 +0300342 private fun NodeList.notifyCompletion(cause: Throwable?) =
343 notifyHandlers<JobNode<*>>(this, cause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300344
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300345 private inline fun <reified T: JobNode<*>> notifyHandlers(list: NodeList, cause: Throwable?) {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300346 var exception: Throwable? = null
347 list.forEach<T> { node ->
348 try {
349 node.invoke(cause)
350 } catch (ex: Throwable) {
351 exception?.apply { addSuppressedThrowable(ex) } ?: run {
352 exception = CompletionHandlerException("Exception in completion handler $node for $this", ex)
353 }
354 }
355 }
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300356 exception?.let { handleOnCompletionException(it) }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300357 }
358
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300359 public final override fun start(): Boolean {
360 loopOnState { state ->
361 when (startInternal(state)) {
362 FALSE -> return false
363 TRUE -> return true
364 }
365 }
366 }
367
368 // returns: RETRY/FALSE/TRUE:
369 // FALSE when not new,
370 // TRUE when started
371 // RETRY when need to retry
372 private fun startInternal(state: Any?): Int {
373 when (state) {
374 is Empty -> { // EMPTY_X state -- no completion handlers
375 if (state.isActive) return FALSE // already active
Roman Elizarov563da402018-08-10 19:18:56 +0300376 if (!_state.compareAndSet(state, EMPTY_ACTIVE)) return RETRY
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300377 onStartInternal()
378 return TRUE
379 }
Roman Elizarovede29232018-09-18 12:53:09 +0300380 is InactiveNodeList -> { // LIST state -- inactive with a list of completion handlers
381 if (!_state.compareAndSet(state, state.list)) return RETRY
382 onStartInternal()
383 return TRUE
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300384 }
385 else -> return FALSE // not a new state
386 }
387 }
388
389 /**
390 * Override to provide the actual [start] action.
391 * This function is invoked exactly once when non-active coroutine is [started][start].
392 */
393 internal open fun onStartInternal() {}
394
395 public final override fun getCancellationException(): CancellationException {
396 val state = this.state
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300397 return when (state) {
Roman Elizarov852fae52018-09-25 20:13:24 +0300398 is Finishing -> state.rootCause?.toCancellationException("Job is cancelling")
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300399 ?: error("Job is still new or active: $this")
400 is Incomplete -> error("Job is still new or active: $this")
Roman Elizarov852fae52018-09-25 20:13:24 +0300401 is CompletedExceptionally -> state.cause.toCancellationException("Job was cancelled")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300402 else -> JobCancellationException("Job has completed normally", null, this)
403 }
404 }
405
406 private fun Throwable.toCancellationException(message: String): CancellationException =
407 this as? CancellationException ?: JobCancellationException(message, this, this@JobSupport)
408
409 /**
410 * Returns the cause that signals the completion of this job -- it returns the original
Vsevolod Tolstopyatov149ba482018-09-24 20:28:02 +0300411 * [cancel] cause, [CancellationException] or **`null` if this job had completed normally**.
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300412 * This function throws [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor
Roman Elizarov852fae52018-09-25 20:13:24 +0300413 * is being cancelled yet.
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300414 *
415 * @suppress **This is unstable API and it is subject to change.**
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300416 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300417 protected fun getCompletionCause(): Throwable? = loopOnState { state ->
418 return when (state) {
419 is Finishing -> state.rootCause
420 ?: error("Job is still new or active: $this")
421 is Incomplete -> error("Job is still new or active: $this")
422 is CompletedExceptionally -> state.cause
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300423 else -> null
424 }
425 }
426
427 @Suppress("OverridingDeprecatedMember")
428 public final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle =
Roman Elizarov6685fd02018-09-25 13:23:53 +0300429 invokeOnCompletion(onCancelling = false, invokeImmediately = true, handler = handler)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300430
431 @Suppress("OverridingDeprecatedMember")
432 public final override fun invokeOnCompletion(handler: CompletionHandler, onCancelling: Boolean): DisposableHandle =
Roman Elizarov6685fd02018-09-25 13:23:53 +0300433 invokeOnCompletion(onCancelling = onCancelling, invokeImmediately = true, handler = handler)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300434
435 @Suppress("OverridingDeprecatedMember")
436 public final override fun invokeOnCompletion(onCancelling_: Boolean, handler: CompletionHandler): DisposableHandle =
Roman Elizarov6685fd02018-09-25 13:23:53 +0300437 invokeOnCompletion(onCancelling = onCancelling_, invokeImmediately = true, handler = handler)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300438
439 // todo: non-final as a workaround for KT-21968, should be final in the future
440 public override fun invokeOnCompletion(
Roman Elizarov6685fd02018-09-25 13:23:53 +0300441 onCancelling: Boolean,
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300442 invokeImmediately: Boolean,
443 handler: CompletionHandler
444 ): DisposableHandle {
445 var nodeCache: JobNode<*>? = null
446 loopOnState { state ->
447 when (state) {
448 is Empty -> { // EMPTY_X state -- no completion handlers
449 if (state.isActive) {
450 // try move to SINGLE state
Roman Elizarov6685fd02018-09-25 13:23:53 +0300451 val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300452 if (_state.compareAndSet(state, node)) return node
453 } else
454 promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
455 }
456 is Incomplete -> {
457 val list = state.list
458 if (list == null) { // SINGLE/SINGLE+
459 promoteSingleToNodeList(state as JobNode<*>)
460 } else {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300461 var rootCause: Throwable? = null
462 var handle: DisposableHandle = NonDisposableHandle
Roman Elizarov6685fd02018-09-25 13:23:53 +0300463 if (onCancelling && state is Finishing) {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300464 synchronized(state) {
Roman Elizarov852fae52018-09-25 20:13:24 +0300465 // check if we are installing cancellation handler on job that is being cancelled
466 rootCause = state.rootCause // != null if cancelling job
467 // We add node to the list in two cases --- either the job is not being cancelled
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300468 // or we are adding a child to a coroutine that is not completing yet
Roman Elizarov852fae52018-09-25 20:13:24 +0300469 if (rootCause == null || handler.isHandlerOf<ChildHandleNode>() && !state.isCompleting) {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300470 // Note: add node the list while holding lock on state (make sure it cannot change)
Roman Elizarov6685fd02018-09-25 13:23:53 +0300471 val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300472 if (!addLastAtomic(state, list, node)) return@loopOnState // retry
Roman Elizarov852fae52018-09-25 20:13:24 +0300473 // just return node if we don't have to invoke handler (not cancelling yet)
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300474 if (rootCause == null) return node
475 // otherwise handler is invoked immediately out of the synchronized section & handle returned
476 handle = node
477 }
478 }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300479 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300480 if (rootCause != null) {
Roman Elizarov852fae52018-09-25 20:13:24 +0300481 // Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300482 if (invokeImmediately) handler.invokeIt(rootCause)
483 return handle
484 } else {
Roman Elizarov6685fd02018-09-25 13:23:53 +0300485 val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300486 if (addLastAtomic(state, list, node)) return node
487 }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300488 }
489 }
490 else -> { // is complete
491 // :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
492 // because we play type tricks on Kotlin/JS and handler is not necessarily a function there
493 if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause)
494 return NonDisposableHandle
495 }
496 }
497 }
498 }
499
500 private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode<*> {
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +0300501 return if (onCancelling)
Roman Elizarov6685fd02018-09-25 13:23:53 +0300502 (handler as? JobCancellingNode<*>)?.also { require(it.job === this) }
503 ?: InvokeOnCancelling(this, handler)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300504 else
Roman Elizarov6685fd02018-09-25 13:23:53 +0300505 (handler as? JobNode<*>)?.also { require(it.job === this && it !is JobCancellingNode) }
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300506 ?: InvokeOnCompletion(this, handler)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300507 }
508
509 private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode<*>) =
510 list.addLastIf(node) { this.state === expect }
511
512 private fun promoteEmptyToNodeList(state: Empty) {
Roman Elizarovede29232018-09-18 12:53:09 +0300513 // try to promote it to LIST state with the corresponding state
514 val list = NodeList()
515 val update = if (state.isActive) list else InactiveNodeList(list)
516 _state.compareAndSet(state, update)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300517 }
518
519 private fun promoteSingleToNodeList(state: JobNode<*>) {
520 // try to promote it to list (SINGLE+ state)
Roman Elizarovede29232018-09-18 12:53:09 +0300521 state.addOneIfEmpty(NodeList())
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300522 // it must be in SINGLE+ state or state has changed (node could have need removed from state)
523 val list = state.nextNode // either our NodeList or somebody else won the race, updated state
524 // just attempt converting it to list if state is still the same, then we'll continue lock-free loop
525 _state.compareAndSet(state, list)
526 }
527
528 public final override suspend fun join() {
529 if (!joinInternal()) { // fast-path no wait
Roman Elizarov222f3f22018-07-13 18:47:17 +0300530 coroutineContext.checkCompletion()
531 return // do not suspend
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300532 }
533 return joinSuspend() // slow-path wait
534 }
535
536 private fun joinInternal(): Boolean {
537 loopOnState { state ->
538 if (state !is Incomplete) return false // not active anymore (complete) -- no need to wait
539 if (startInternal(state) >= 0) return true // wait unless need to retry
540 }
541 }
542
543 private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +0300544 // We have to invoke join() handler only on cancellation, on completion we will be resumed regularly without handlers
545 cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(this, cont).asHandler))
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300546 }
547
548 public final override val onJoin: SelectClause0
549 get() = this
550
551 // registerSelectJoin
552 public final override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) {
553 // fast-path -- check state and select/return if needed
554 loopOnState { state ->
555 if (select.isSelected) return
556 if (state !is Incomplete) {
557 // already complete -- select result
558 if (select.trySelect(null)) {
559 select.completion.context.checkCompletion() // always check for our completion
Roman Elizarov7587eba2018-07-25 12:22:46 +0300560 block.startCoroutineUnintercepted(select.completion)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300561 }
562 return
563 }
564 if (startInternal(state) == 0) {
565 // slow-path -- register waiter for completion
566 select.disposeOnSelect(invokeOnCompletion(handler = SelectJoinOnCompletion(this, select, block).asHandler))
567 return
568 }
569 }
570 }
571
572 /**
573 * @suppress **This is unstable API and it is subject to change.**
574 */
575 internal fun removeNode(node: JobNode<*>) {
576 // remove logic depends on the state of the job
577 loopOnState { state ->
578 when (state) {
579 is JobNode<*> -> { // SINGE/SINGLE+ state -- one completion handler
580 if (state !== node) return // a different job node --> we were already removed
581 // try remove and revert back to empty state
Roman Elizarov563da402018-08-10 19:18:56 +0300582 if (_state.compareAndSet(state, EMPTY_ACTIVE)) return
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300583 }
584 is Incomplete -> { // may have a list of completion handlers
585 // remove node from the list if there is a list
586 if (state.list != null) node.remove()
587 return
588 }
589 else -> return // it is complete and does not have any completion handlers
590 }
591 }
592 }
593
594 /**
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300595 * Returns `true` for job that do not have "body block" to complete and should immediately go into
596 * completing state and start waiting for children.
597 *
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300598 * @suppress **This is unstable API and it is subject to change.**
599 */
Roman Elizarov6685fd02018-09-25 13:23:53 +0300600 internal open val onCancelComplete: Boolean get() = false
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300601
Roman Elizarov1e0a2f02018-09-25 16:10:26 +0300602 // external cancel without cause, never invoked implicitly from internal machinery
603 public override fun cancel(): Boolean =
604 cancel(null) // must delegate here, because some classes override cancel(x)
Roman Elizarov27b8f452018-09-20 21:23:41 +0300605
Roman Elizarov6685fd02018-09-25 13:23:53 +0300606 // external cancel with (optional) cause, never invoked implicitly from internal machinery
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300607 public override fun cancel(cause: Throwable?): Boolean =
Roman Elizarov6685fd02018-09-25 13:23:53 +0300608 cancelImpl(cause) && handlesException
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300609
Roman Elizarov852fae52018-09-25 20:13:24 +0300610 // parent is cancelling child
Roman Elizarov6685fd02018-09-25 13:23:53 +0300611 public final override fun parentCancelled(parentJob: Job) {
612 cancelImpl(parentJob)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300613 }
614
Roman Elizarov6685fd02018-09-25 13:23:53 +0300615 // child was cancelled with cause
616 internal fun childCancelled(cause: Throwable): Boolean =
617 cancelImpl(cause) && handlesException
Roman Elizarovf7a63342018-09-25 00:26:25 +0300618
Roman Elizarov6685fd02018-09-25 13:23:53 +0300619 // cause is Throwable or Job when cancelChild was invoked
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300620 // returns true is exception was handled, false otherwise
Roman Elizarov6685fd02018-09-25 13:23:53 +0300621 private fun cancelImpl(cause: Any?): Boolean {
622 if (onCancelComplete) {
623 // make sure it is completing, if cancelMakeCompleting returns true it means it had make it
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300624 // completing and had recorded exception
Roman Elizarov6685fd02018-09-25 13:23:53 +0300625 if (cancelMakeCompleting(cause)) return true
Roman Elizarov852fae52018-09-25 20:13:24 +0300626 // otherwise just record exception via makeCancelling below
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300627 }
Roman Elizarov6685fd02018-09-25 13:23:53 +0300628 return makeCancelling(cause)
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300629 }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300630
Roman Elizarov6685fd02018-09-25 13:23:53 +0300631 private fun cancelMakeCompleting(cause: Any?): Boolean {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300632 loopOnState { state ->
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300633 if (state !is Incomplete || state is Finishing && state.isCompleting) {
634 return false // already completed/completing, do not even propose update
635 }
Roman Elizarov6685fd02018-09-25 13:23:53 +0300636 val proposedUpdate = CompletedExceptionally(createCauseException(cause))
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300637 when (tryMakeCompleting(state, proposedUpdate, mode = MODE_ATOMIC_DEFAULT)) {
638 COMPLETING_ALREADY_COMPLETING -> return false
639 COMPLETING_COMPLETED, COMPLETING_WAITING_CHILDREN -> return true
640 COMPLETING_RETRY -> return@loopOnState
641 else -> error("unexpected result")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300642 }
643 }
644 }
645
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300646 private fun createJobCancellationException() =
647 JobCancellationException("Job was cancelled", null, this)
648
649 // cause is Throwable or Job when cancelChild was invoked, cause can be null only on cancel
650 private fun createCauseException(cause: Any?): Throwable = when(cause) {
651 is Throwable? -> cause ?: createJobCancellationException()
652 else -> (cause as Job).getCancellationException()
653 }
654
Roman Elizarov852fae52018-09-25 20:13:24 +0300655 // transitions to Cancelling state
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300656 // cause is Throwable or Job when cancelChild was invoked, cause can be null only on cancel
Roman Elizarov6685fd02018-09-25 13:23:53 +0300657 private fun makeCancelling(cause: Any?): Boolean {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300658 var causeExceptionCache: Throwable? = null // lazily init result of createCauseException(cause)
659 loopOnState { state ->
660 when (state) {
661 is Finishing -> { // already finishing -- collect exceptions
Vsevolod Tolstopyatovf157cec2018-09-24 17:22:06 +0300662 val notifyRootCause = synchronized(state) {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300663 if (state.isSealed) return false // too late, already sealed -- cannot add exception nor mark cancelled
Roman Elizarov6685fd02018-09-25 13:23:53 +0300664 // add exception, do nothing is parent is cancelling child that is already being cancelled
Roman Elizarov852fae52018-09-25 20:13:24 +0300665 val wasCancelling = state.isCancelling // will notify if was not cancelling
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300666 // Materialize missing exception if it is the first exception (otherwise -- don't)
Roman Elizarov6685fd02018-09-25 13:23:53 +0300667 if (cause != null || !wasCancelling) {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300668 val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
669 state.addExceptionLocked(causeException)
670 }
Roman Elizarov6685fd02018-09-25 13:23:53 +0300671 // take cause for notification is was not cancelling before
672 state.rootCause.takeIf { !wasCancelling }
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300673 }
Roman Elizarov6685fd02018-09-25 13:23:53 +0300674 notifyRootCause?.let { notifyCancelling(state.list, it) }
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300675 return true
676 }
677 is Incomplete -> {
Roman Elizarov852fae52018-09-25 20:13:24 +0300678 // Not yet finishing -- try to make it cancelling
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300679 val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
680 if (state.isActive) {
Roman Elizarov852fae52018-09-25 20:13:24 +0300681 // active state becomes cancelling
682 if (tryMakeCancelling(state, causeException)) return true
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300683 } else {
684 // non active state starts completing
Roman Elizarov6685fd02018-09-25 13:23:53 +0300685 when (tryMakeCompleting(state, CompletedExceptionally(causeException), mode = MODE_ATOMIC_DEFAULT)) {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300686 COMPLETING_ALREADY_COMPLETING -> error("Cannot happen in $state")
687 COMPLETING_COMPLETED, COMPLETING_WAITING_CHILDREN -> return true // ok
688 COMPLETING_RETRY -> return@loopOnState
689 else -> error("unexpected result")
690 }
691 }
692 }
693 else -> return false // already complete
694 }
695 }
696 }
697
Roman Elizarovf45ec8d2018-09-24 23:00:11 +0300698 // Performs promotion of incomplete coroutine state to NodeList for the purpose of
Roman Elizarov852fae52018-09-25 20:13:24 +0300699 // converting coroutine state to Cancelling, returns null when need to retry
700 private fun getOrPromoteCancellingList(state: Incomplete): NodeList? = state.list ?:
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300701 when (state) {
Roman Elizarov852fae52018-09-25 20:13:24 +0300702 is Empty -> NodeList() // we can allocate new empty list that'll get integrated into Cancelling state
Roman Elizarovf45ec8d2018-09-24 23:00:11 +0300703 is JobNode<*> -> {
704 // SINGLE/SINGLE+ must be promoted to NodeList first, because otherwise we cannot
705 // correctly capture a reference to it
706 promoteSingleToNodeList(state)
707 null // retry
708 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300709 else -> error("State should have list: $state")
710 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300711
Roman Elizarov852fae52018-09-25 20:13:24 +0300712 // try make new Cancelling state on the condition that we're still in the expected state
713 private fun tryMakeCancelling(state: Incomplete, rootCause: Throwable): Boolean {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300714 check(state !is Finishing) // only for non-finishing states
715 check(state.isActive) // only for active states
Roman Elizarovf45ec8d2018-09-24 23:00:11 +0300716 // get state's list or else promote to list to correctly operate on child lists
Roman Elizarov852fae52018-09-25 20:13:24 +0300717 val list = getOrPromoteCancellingList(state) ?: return false
718 // Create cancelling state (with rootCause!)
719 val cancelling = Finishing(list, false, rootCause)
720 if (!_state.compareAndSet(state, cancelling)) return false
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300721 // Notify listeners
Roman Elizarov6685fd02018-09-25 13:23:53 +0300722 notifyCancelling(list, rootCause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300723 return true
724 }
725
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300726 /**
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300727 * This function is used by [CompletableDeferred.complete] (and exceptionally) and by [JobImpl.cancel].
728 * It returns `false` on repeated invocation (when this job is already completing).
729 *
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300730 * @suppress **This is unstable API and it is subject to change.**
731 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300732 internal fun makeCompleting(proposedUpdate: Any?): Boolean = loopOnState { state ->
733 when (tryMakeCompleting(state, proposedUpdate, mode = MODE_ATOMIC_DEFAULT)) {
734 COMPLETING_ALREADY_COMPLETING -> return false
735 COMPLETING_COMPLETED, COMPLETING_WAITING_CHILDREN -> return true
736 COMPLETING_RETRY -> return@loopOnState
737 else -> error("unexpected result")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300738 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300739 }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300740
741 /**
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300742 * This function is used by [AbstractCoroutine.resume].
743 * It throws exception on repeated invocation (when this job is already completing).
744 *
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300745 * Returns:
746 * * `true` if state was updated to completed/cancelled;
747 * * `false` if made completing or it is cancelling and is waiting for children.
748 *
749 * @throws IllegalStateException if job is already complete or completing
750 * @suppress **This is unstable API and it is subject to change.**
751 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300752 internal fun makeCompletingOnce(proposedUpdate: Any?, mode: Int): Boolean = loopOnState { state ->
753 when (tryMakeCompleting(state, proposedUpdate, mode)) {
754 COMPLETING_ALREADY_COMPLETING -> throw IllegalStateException("Job $this is already complete or completing, " +
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300755 "but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull)
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300756 COMPLETING_COMPLETED -> return true
757 COMPLETING_WAITING_CHILDREN -> return false
758 COMPLETING_RETRY -> return@loopOnState
759 else -> error("unexpected result")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300760 }
761 }
762
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300763 private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?, mode: Int): Int {
764 if (state !is Incomplete)
765 return COMPLETING_ALREADY_COMPLETING
Vsevolod Tolstopyatovf157cec2018-09-24 17:22:06 +0300766 /*
Roman Elizarov852fae52018-09-25 20:13:24 +0300767 * FAST PATH -- no children to wait for && simple state (no list) && not cancelling => can complete immediately
768 * Cancellation (failures) always have to go through Finishing state to serialize exception handling.
Vsevolod Tolstopyatovf157cec2018-09-24 17:22:06 +0300769 * Otherwise, there can be a race between (completed state -> handled exception and newly attached child/join)
770 * which may miss unhandled exception.
771 */
Roman Elizarov852fae52018-09-25 20:13:24 +0300772 if ((state is Empty || state is JobNode<*>) && state !is ChildHandleNode && proposedUpdate !is CompletedExceptionally) {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300773 if (!tryFinalizeSimpleState(state, proposedUpdate, mode)) return COMPLETING_RETRY
774 return COMPLETING_COMPLETED
775 }
776 // get state's list or else promote to list to correctly operate on child lists
Roman Elizarov852fae52018-09-25 20:13:24 +0300777 val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300778 // promote to Finishing state if we are not in it yet
779 // This promotion has to be atomic w.r.t to state change, so that a coroutine that is not active yet
780 // atomically transition to finishing & completing state
Roman Elizarov6685fd02018-09-25 13:23:53 +0300781 val finishing = state as? Finishing ?: Finishing(list, false, null)
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300782 // must synchronize updates to finishing state
783 var notifyRootCause: Throwable? = null
784 synchronized(finishing) {
785 // check if this state is already completing
786 if (finishing.isCompleting) return COMPLETING_ALREADY_COMPLETING
787 // mark as completing
788 finishing.isCompleting = true
Roman Elizarovad847952018-09-24 23:20:18 +0300789 // if we need to promote to finishing then atomically do it here.
Roman Elizarov6685fd02018-09-25 13:23:53 +0300790 // We do it as early is possible while still holding the lock. This ensures that we cancelImpl asap
Roman Elizarovad847952018-09-24 23:20:18 +0300791 // (if somebody else is faster) and we synchronize all the threads on this finishing lock asap.
792 if (finishing !== state) {
793 if (!_state.compareAndSet(state, finishing)) return COMPLETING_RETRY
794 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300795 // ## IMPORTANT INVARIANT: Only one thread (that had set isCompleting) can go past this point
796 require(!finishing.isSealed) // cannot be sealed
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300797 // add new proposed exception to the finishing state
Roman Elizarov6685fd02018-09-25 13:23:53 +0300798 val wasCancelling = finishing.isCancelling
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300799 (proposedUpdate as? CompletedExceptionally)?.let { finishing.addExceptionLocked(it.cause) }
Roman Elizarov852fae52018-09-25 20:13:24 +0300800 // If it just becomes cancelling --> must process cancelling notifications
Roman Elizarov6685fd02018-09-25 13:23:53 +0300801 notifyRootCause = finishing.rootCause.takeIf { !wasCancelling }
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300802 }
Roman Elizarov852fae52018-09-25 20:13:24 +0300803 // process cancelling notification here -- it cancels all the children _before_ we start to to wait them (sic!!!)
Roman Elizarov6685fd02018-09-25 13:23:53 +0300804 notifyRootCause?.let { notifyCancelling(list, it) }
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300805 // now wait for children
Roman Elizarov6634ed72018-09-20 15:13:06 +0300806 val child = firstChild(state)
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300807 if (child != null && tryWaitForChild(finishing, child, proposedUpdate))
808 return COMPLETING_WAITING_CHILDREN
809 // otherwise -- we have not children left (all were already cancelled?)
810 if (tryFinalizeFinishingState(finishing, proposedUpdate, mode))
811 return COMPLETING_COMPLETED
812 // otherwise retry
813 return COMPLETING_RETRY
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300814 }
815
816 private val Any?.exceptionOrNull: Throwable?
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300817 get() = (this as? CompletedExceptionally)?.cause
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300818
819 private fun firstChild(state: Incomplete) =
Roman Elizarov852fae52018-09-25 20:13:24 +0300820 state as? ChildHandleNode ?: state.list?.nextChild()
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300821
822 // return false when there is no more incomplete children to wait
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300823 // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
Roman Elizarov852fae52018-09-25 20:13:24 +0300824 private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleNode, proposedUpdate: Any?): Boolean {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300825 val handle = child.childJob.invokeOnCompletion(
826 invokeImmediately = false,
827 handler = ChildCompletion(this, state, child, proposedUpdate).asHandler
828 )
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300829 if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
830 val nextChild = child.nextChild() ?: return false
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300831 return tryWaitForChild(state, nextChild, proposedUpdate)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300832 }
833
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300834 // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
Roman Elizarov852fae52018-09-25 20:13:24 +0300835 private fun continueCompleting(state: Finishing, lastChild: ChildHandleNode, proposedUpdate: Any?) {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300836 require(this.state === state) // consistency check -- it cannot change while we are waiting for children
837 // figure out if we need to wait for next child
838 val waitChild = lastChild.nextChild()
839 // try wait for next child
840 if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child
841 // no more children to wait -- try update state
842 if (tryFinalizeFinishingState(state, proposedUpdate, MODE_ATOMIC_DEFAULT)) return
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300843 }
844
Roman Elizarov852fae52018-09-25 20:13:24 +0300845 private fun LockFreeLinkedListNode.nextChild(): ChildHandleNode? {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300846 var cur = this
847 while (cur.isRemoved) cur = cur.prevNode // rollback to prev non-removed (or list head)
848 while (true) {
849 cur = cur.nextNode
850 if (cur.isRemoved) continue
Roman Elizarov852fae52018-09-25 20:13:24 +0300851 if (cur is ChildHandleNode) return cur
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300852 if (cur is NodeList) return null // checked all -- no more children
853 }
854 }
855
856 public final override val children: Sequence<Job> get() = buildSequence {
857 val state = this@JobSupport.state
858 when (state) {
Roman Elizarov852fae52018-09-25 20:13:24 +0300859 is ChildHandleNode -> yield(state.childJob)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300860 is Incomplete -> state.list?.let { list ->
Roman Elizarov852fae52018-09-25 20:13:24 +0300861 list.forEach<ChildHandleNode> { yield(it.childJob) }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300862 }
863 }
864 }
865
866 @Suppress("OverridingDeprecatedMember")
Roman Elizarovf7a63342018-09-25 00:26:25 +0300867 public final override fun attachChild(child: ChildJob): ChildHandle {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300868 /*
Roman Elizarov852fae52018-09-25 20:13:24 +0300869 * Note: This function attaches a special ChildHandleNode node object. This node object
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300870 * is handled in a special way on completion on the coroutine (we wait for all of them) and
871 * is handled specially by invokeOnCompletion itself -- it adds this node to the list even
Roman Elizarov852fae52018-09-25 20:13:24 +0300872 * if the job is already cancelling. For cancelling state child is attached under state lock.
Vsevolod Tolstopyatov409ed262018-09-24 17:48:20 +0300873 * It's required to properly wait all children before completion and provide linearizable hierarchy view:
Roman Elizarov852fae52018-09-25 20:13:24 +0300874 * If child is attached when job is already being cancelled, such child will receive immediate notification on
875 * cancellation, but parent *will* wait for that child before completion and will handle its exception.
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300876 */
Roman Elizarov852fae52018-09-25 20:13:24 +0300877 return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(this, child).asHandler) as ChildHandle
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300878 }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300879
880 @Suppress("OverridingDeprecatedMember")
881 public final override fun cancelChildren(cause: Throwable?) {
882 this.cancelChildren(cause) // use extension function
883 }
884
885 /**
886 * Override to process any exceptions that were encountered while invoking completion handlers
887 * installed via [invokeOnCompletion].
888 * @suppress **This is unstable API and it is subject to change.**
889 */
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300890 internal open fun handleOnCompletionException(exception: Throwable) {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300891 throw exception
892 }
893
894 /**
Roman Elizarov852fae52018-09-25 20:13:24 +0300895 * This function is invoked once when job is being cancelled, fails, or is completed.
Roman Elizarov6685fd02018-09-25 13:23:53 +0300896 * It's an optimization for [invokeOnCompletion] with `onCancellation` set to `true`.
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300897 *
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300898 * @suppress **This is unstable API and it is subject to change.*
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300899 */
Roman Elizarov6685fd02018-09-25 13:23:53 +0300900 protected open fun onCancellation(cause: Throwable?) {}
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300901
Roman Elizarov5d18d022018-09-22 22:13:05 +0300902 /**
Roman Elizarov852fae52018-09-25 20:13:24 +0300903 * When this function returns `true` the parent is cancelled on cancellation of this job.
904 * Note, that [CancellationException] is considered "normal" and parent is not cancelled when child produces it.
905 * This allows parent to cancel its children (normally) without being cancelled itself, unless
906 * child crashes and produce some other exception during its completion.
Roman Elizarov5d18d022018-09-22 22:13:05 +0300907 *
908 * @suppress **This is unstable API and it is subject to change.*
909 */
Roman Elizarov6685fd02018-09-25 13:23:53 +0300910 protected open val cancelsParent: Boolean get() = false
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300911
912 /**
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300913 * Returns `true` for jobs that handle their exceptions via [handleJobException] or integrate them
914 * into the job's result via [onCompletionInternal]. The only instance of the [Job] that does not
915 * handle its exceptions is [JobImpl].
916 *
917 * @suppress **This is unstable API and it is subject to change.*
Roman Elizarov67912f92018-09-16 01:46:43 +0300918 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300919 protected open val handlesException: Boolean get() = true
Roman Elizarov67912f92018-09-16 01:46:43 +0300920
921 /**
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300922 * This method is invoked **exactly once** when the final exception of the job is determined
923 * and before it becomes complete. At the moment of invocation the job and all its children are complete.
924 *
925 * @suppress **This is unstable API and it is subject to change.*
Roman Elizarov67912f92018-09-16 01:46:43 +0300926 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300927 protected open fun handleJobException(exception: Throwable) {}
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300928
Roman Elizarov6685fd02018-09-25 13:23:53 +0300929 private fun cancelParent(cause: Throwable): Boolean {
Roman Elizarov852fae52018-09-25 20:13:24 +0300930 // CancellationException is considered "normal" and parent is not cancelled when child produces it.
931 // This allow parent to cancel its children (normally) without being cancelled itself, unless
932 // child crashes and produce some other exception during its completion.
Roman Elizarov5d18d022018-09-22 22:13:05 +0300933 if (cause is CancellationException) return true
Roman Elizarov6685fd02018-09-25 13:23:53 +0300934 if (!cancelsParent) return false
935 return parentHandle?.childCancelled(cause) == true
Roman Elizarov5d18d022018-09-22 22:13:05 +0300936 }
937
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300938 /**
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300939 * Override for post-completion actions that need to do something with the state.
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300940 * @param state the final state.
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300941 * @param mode completion mode.
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300942 * @param suppressed true when any exceptions were suppressed while building the final completion cause.
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300943 * @suppress **This is unstable API and it is subject to change.**
944 */
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300945 internal open fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {}
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300946
947 // for nicer debugging
948 public override fun toString(): String =
949 "${nameString()}{${stateString()}}@$hexAddress"
950
951 /**
952 * @suppress **This is unstable API and it is subject to change.**
953 */
954 internal open fun nameString(): String = classSimpleName
955
956 private fun stateString(): String {
957 val state = this.state
958 return when (state) {
959 is Finishing -> buildString {
Roman Elizarov6685fd02018-09-25 13:23:53 +0300960 when {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300961 state.isCancelling -> append("Cancelling")
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300962 else -> append("Active")
963 }
964 if (state.isCompleting) append("Completing")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300965 }
966 is Incomplete -> if (state.isActive) "Active" else "New"
Roman Elizarov6685fd02018-09-25 13:23:53 +0300967 is CompletedExceptionally -> "Cancelled"
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300968 else -> "Completed"
969 }
970 }
971
Roman Elizarov852fae52018-09-25 20:13:24 +0300972 // Completing & Cancelling states,
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300973 // All updates are guarded by synchronized(this), reads are volatile
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +0300974 @Suppress("UNCHECKED_CAST")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +0300975 private class Finishing(
976 override val list: NodeList,
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300977 @Volatile
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300978 @JvmField var isCompleting: Boolean,
979 @Volatile
980 @JvmField var rootCause: Throwable? // NOTE: rootCause is kept even when SEALED
Roman Elizarov4e33cc62018-09-05 01:15:44 +0300981 ) : SynchronizedObject(), Incomplete {
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300982 @Volatile
983 private var _exceptionsHolder: Any? = null // Contains null | Throwable | ArrayList | SEALED
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +0300984
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300985 // NotE: cannot be modified when sealed
986 val isSealed: Boolean get() = _exceptionsHolder === SEALED
Roman Elizarov6685fd02018-09-25 13:23:53 +0300987 val isCancelling: Boolean get() = rootCause != null
988 override val isActive: Boolean get() = rootCause == null // !isCancelling
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300989
990 // Seals current state and returns list of exceptions
Roman Elizarov563da402018-08-10 19:18:56 +0300991 // guarded by `synchronized(this)`
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300992 fun sealLocked(proposedException: Throwable?): List<Throwable> {
993 val eh = _exceptionsHolder // volatile read
994 val list = when(eh) {
995 null -> allocateList()
996 is Throwable -> allocateList().also { it.add(eh) }
997 is ArrayList<*> -> eh as ArrayList<Throwable>
998 else -> error("State is $eh") // already sealed -- cannot happen
999 }
1000 val rootCause = this.rootCause // volatile read
1001 rootCause?.let { list.add(0, it) } // note -- rootCause goes to the beginning
1002 if (proposedException != null && proposedException != rootCause) list.add(proposedException)
1003 _exceptionsHolder = SEALED
1004 return list
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +03001005 }
1006
Roman Elizarov563da402018-08-10 19:18:56 +03001007 // guarded by `synchronized(this)`
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001008 fun addExceptionLocked(exception: Throwable) {
1009 val rootCause = this.rootCause // volatile read
1010 if (rootCause == null) {
1011 this.rootCause = exception
1012 return
Vsevolod Tolstopyatov06f57aa2018-07-24 19:51:21 +03001013 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001014 if (exception === rootCause) return // nothing to do
1015 val eh = _exceptionsHolder // volatile read
1016 when (eh) {
1017 null -> _exceptionsHolder = exception
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +03001018 is Throwable -> {
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001019 if (exception === eh) return // nothing to do
1020 _exceptionsHolder = allocateList().apply {
1021 add(eh)
Roman Elizarov563da402018-08-10 19:18:56 +03001022 add(exception)
1023
1024 }
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +03001025 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001026 is ArrayList<*> -> (eh as ArrayList<Throwable>).add(exception)
1027 else -> error("State is $eh") // already sealed -- cannot happen
Vsevolod Tolstopyatov91ecee82018-08-07 18:24:00 +03001028 }
1029 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001030
1031 private fun allocateList() = ArrayList<Throwable>(4)
1032
1033 override fun toString(): String =
1034 "Finishing[cancelling=$isCancelling, completing=$isCompleting, rootCause=$rootCause, exceptions=$_exceptionsHolder, list=$list]"
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001035 }
1036
1037 private val Incomplete.isCancelling: Boolean
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001038 get() = this is Finishing && isCancelling
1039
1040 // Used by parent that is waiting for child completion
1041 private class ChildCompletion(
1042 private val parent: JobSupport,
1043 private val state: Finishing,
Roman Elizarov852fae52018-09-25 20:13:24 +03001044 private val child: ChildHandleNode,
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001045 private val proposedUpdate: Any?
1046 ) : JobNode<Job>(child.childJob) {
1047 override fun invoke(cause: Throwable?) {
1048 parent.continueCompleting(state, child, proposedUpdate)
1049 }
1050 override fun toString(): String =
1051 "ChildCompletion[$child, $proposedUpdate]"
1052 }
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001053
1054 /*
1055 * =================================================================================================
1056 * This is ready-to-use implementation for Deferred interface.
1057 * However, it is not type-safe. Conceptually it just exposes the value of the underlying
1058 * completed state as `Any?`
1059 * =================================================================================================
1060 */
1061
1062 public val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally
1063
1064 public fun getCompletionExceptionOrNull(): Throwable? {
1065 val state = this.state
1066 check(state !is Incomplete) { "This job has not completed yet" }
1067 return state.exceptionOrNull
1068 }
1069
1070 /**
1071 * @suppress **This is unstable API and it is subject to change.**
1072 */
1073 internal fun getCompletedInternal(): Any? {
1074 val state = this.state
1075 check(state !is Incomplete) { "This job has not completed yet" }
Roman Elizarov6d9f40f2018-04-28 14:44:02 +03001076 if (state is CompletedExceptionally) throw state.cause
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001077 return state
1078 }
1079
1080 /**
1081 * @suppress **This is unstable API and it is subject to change.**
1082 */
1083 internal suspend fun awaitInternal(): Any? {
1084 // fast-path -- check state (avoid extra object creation)
1085 while(true) { // lock-free loop on state
1086 val state = this.state
1087 if (state !is Incomplete) {
1088 // already complete -- just return result
Roman Elizarov6d9f40f2018-04-28 14:44:02 +03001089 if (state is CompletedExceptionally) throw state.cause
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001090 return state
1091
1092 }
1093 if (startInternal(state) >= 0) break // break unless needs to retry
1094 }
1095 return awaitSuspend() // slow-path
1096 }
1097
1098 private suspend fun awaitSuspend(): Any? = suspendCancellableCoroutine { cont ->
Vsevolod Tolstopyatovf6430f42018-04-17 17:56:32 +03001099 // We have to invoke await() handler only on cancellation, on completion we will be resumed regularly without handlers
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +03001100 cont.disposeOnCancellation(invokeOnCompletion {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001101 val state = this.state
1102 check(state !is Incomplete)
1103 if (state is CompletedExceptionally)
Roman Elizarov6d9f40f2018-04-28 14:44:02 +03001104 cont.resumeWithException(state.cause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001105 else
1106 cont.resume(state)
1107 })
1108 }
1109
1110 /**
1111 * @suppress **This is unstable API and it is subject to change.**
1112 */
1113 // registerSelectAwaitInternal
1114 @Suppress("UNCHECKED_CAST")
1115 internal fun <T, R> registerSelectClause1Internal(select: SelectInstance<R>, block: suspend (T) -> R) {
1116 // fast-path -- check state and select/return if needed
1117 loopOnState { state ->
1118 if (select.isSelected) return
1119 if (state !is Incomplete) {
1120 // already complete -- select result
1121 if (select.trySelect(null)) {
1122 if (state is CompletedExceptionally)
Roman Elizarov6d9f40f2018-04-28 14:44:02 +03001123 select.resumeSelectCancellableWithException(state.cause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001124 else
Roman Elizarov7587eba2018-07-25 12:22:46 +03001125 block.startCoroutineUnintercepted(state as T, select.completion)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001126 }
1127 return
1128 }
1129 if (startInternal(state) == 0) {
1130 // slow-path -- register waiter for completion
1131 select.disposeOnSelect(invokeOnCompletion(handler = SelectAwaitOnCompletion(this, select, block).asHandler))
1132 return
1133 }
1134 }
1135 }
1136
1137 /**
1138 * @suppress **This is unstable API and it is subject to change.**
1139 */
1140 @Suppress("UNCHECKED_CAST")
1141 internal fun <T, R> selectAwaitCompletion(select: SelectInstance<R>, block: suspend (T) -> R) {
1142 val state = this.state
1143 // Note: await is non-atomic (can be cancelled while dispatched)
1144 if (state is CompletedExceptionally)
Roman Elizarov6d9f40f2018-04-28 14:44:02 +03001145 select.resumeSelectCancellableWithException(state.cause)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001146 else
1147 block.startCoroutineCancellable(state as T, select.completion)
1148 }
1149}
1150
Roman Elizarov563da402018-08-10 19:18:56 +03001151// --------------- helper classes & constants for job implementation
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001152
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001153private const val COMPLETING_ALREADY_COMPLETING = 0
1154private const val COMPLETING_COMPLETED = 1
1155private const val COMPLETING_WAITING_CHILDREN = 2
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001156private const val COMPLETING_RETRY = 3
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001157
1158private const val RETRY = -1
1159private const val FALSE = 0
1160private const val TRUE = 1
1161
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001162private val SEALED = Symbol("SEALED")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001163
Roman Elizarov563da402018-08-10 19:18:56 +03001164private val EMPTY_NEW = Empty(false)
1165private val EMPTY_ACTIVE = Empty(true)
Vsevolod Tolstopyatovc9afb672018-07-24 20:30:48 +03001166
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001167private class Empty(override val isActive: Boolean) : Incomplete {
1168 override val list: NodeList? get() = null
1169 override fun toString(): String = "Empty{${if (isActive) "Active" else "New" }}"
1170}
1171
1172internal class JobImpl(parent: Job? = null) : JobSupport(true) {
1173 init { initParentJobInternal(parent) }
Roman Elizarov6685fd02018-09-25 13:23:53 +03001174 override val onCancelComplete get() = true
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001175 override val handlesException: Boolean get() = false
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001176}
1177
1178// -------- invokeOnCompletion nodes
1179
1180internal interface Incomplete {
1181 val isActive: Boolean
1182 val list: NodeList? // is null only for Empty and JobNode incomplete state objects
1183}
1184
Roman Elizarov6d9f40f2018-04-28 14:44:02 +03001185internal abstract class JobNode<out J : Job>(
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001186 @JvmField val job: J
Roman Elizarovdbd9e1c2018-04-28 15:14:18 +03001187) : CompletionHandlerBase(), DisposableHandle, Incomplete {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001188 override val isActive: Boolean get() = true
1189 override val list: NodeList? get() = null
1190 override fun dispose() = (job as JobSupport).removeNode(this)
1191}
1192
Roman Elizarovede29232018-09-18 12:53:09 +03001193internal class NodeList : LockFreeLinkedListHead(), Incomplete {
1194 override val isActive: Boolean get() = true
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001195 override val list: NodeList get() = this
1196
Roman Elizarovede29232018-09-18 12:53:09 +03001197 fun getString(state: String) = buildString {
1198 append("List{")
1199 append(state)
1200 append("}[")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001201 var first = true
1202 this@NodeList.forEach<JobNode<*>> { node ->
1203 if (first) first = false else append(", ")
1204 append(node)
1205 }
1206 append("]")
1207 }
Roman Elizarovede29232018-09-18 12:53:09 +03001208
1209 override fun toString(): String = getString("Active")
1210}
1211
1212internal class InactiveNodeList(
1213 override val list: NodeList
1214) : Incomplete {
1215 override val isActive: Boolean get() = false
1216 override fun toString(): String = list.getString("New")
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001217}
1218
1219private class InvokeOnCompletion(
1220 job: Job,
1221 private val handler: CompletionHandler
1222) : JobNode<Job>(job) {
1223 override fun invoke(cause: Throwable?) = handler.invoke(cause)
1224 override fun toString() = "InvokeOnCompletion[$classSimpleName@$hexAddress]"
1225}
1226
1227private class ResumeOnCompletion(
1228 job: Job,
1229 private val continuation: Continuation<Unit>
1230) : JobNode<Job>(job) {
1231 override fun invoke(cause: Throwable?) = continuation.resume(Unit)
1232 override fun toString() = "ResumeOnCompletion[$continuation]"
1233}
1234
1235internal class DisposeOnCompletion(
1236 job: Job,
1237 private val handle: DisposableHandle
1238) : JobNode<Job>(job) {
1239 override fun invoke(cause: Throwable?) = handle.dispose()
1240 override fun toString(): String = "DisposeOnCompletion[$handle]"
1241}
1242
1243private class SelectJoinOnCompletion<R>(
1244 job: JobSupport,
1245 private val select: SelectInstance<R>,
1246 private val block: suspend () -> R
1247) : JobNode<JobSupport>(job) {
1248 override fun invoke(cause: Throwable?) {
1249 if (select.trySelect(null))
1250 block.startCoroutineCancellable(select.completion)
1251 }
1252 override fun toString(): String = "SelectJoinOnCompletion[$select]"
1253}
1254
1255private class SelectAwaitOnCompletion<T, R>(
1256 job: JobSupport,
1257 private val select: SelectInstance<R>,
1258 private val block: suspend (T) -> R
1259) : JobNode<JobSupport>(job) {
1260 override fun invoke(cause: Throwable?) {
1261 if (select.trySelect(null))
1262 job.selectAwaitCompletion(select, block)
1263 }
1264 override fun toString(): String = "SelectAwaitOnCompletion[$select]"
1265}
1266
1267// -------- invokeOnCancellation nodes
1268
1269/**
Roman Elizarov852fae52018-09-25 20:13:24 +03001270 * Marker for node that shall be invoked on in _cancelling_ state.
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001271 * **Note: may be invoked multiple times.**
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001272 */
Roman Elizarov6685fd02018-09-25 13:23:53 +03001273internal abstract class JobCancellingNode<out J : Job>(job: J) : JobNode<J>(job)
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001274
Roman Elizarov6685fd02018-09-25 13:23:53 +03001275private class InvokeOnCancelling(
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001276 job: Job,
1277 private val handler: CompletionHandler
Roman Elizarov6685fd02018-09-25 13:23:53 +03001278) : JobCancellingNode<Job>(job) {
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001279 // delegate handler shall be invoked at most once, so here is an additional flag
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001280 private val _invoked = atomic(0) // todo: replace with atomic boolean after migration to recent atomicFu
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001281 override fun invoke(cause: Throwable?) {
1282 if (_invoked.compareAndSet(0, 1)) handler.invoke(cause)
1283 }
Roman Elizarov6685fd02018-09-25 13:23:53 +03001284 override fun toString() = "InvokeOnCancelling[$classSimpleName@$hexAddress]"
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001285}
1286
Roman Elizarov852fae52018-09-25 20:13:24 +03001287internal class ChildHandleNode(
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001288 parent: JobSupport,
Roman Elizarovf7a63342018-09-25 00:26:25 +03001289 @JvmField val childJob: ChildJob
Roman Elizarov6685fd02018-09-25 13:23:53 +03001290) : JobCancellingNode<JobSupport>(parent), ChildHandle {
1291 override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
1292 override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
Roman Elizarovf7a63342018-09-25 00:26:25 +03001293 override fun toString(): String = "ChildHandle[$childJob]"
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +03001294}
1295
Roman Elizarov852fae52018-09-25 20:13:24 +03001296// Same as ChildHandleNode, but for cancellable continuation
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +03001297internal class ChildContinuation(
1298 parent: Job,
1299 @JvmField val child: AbstractContinuation<*>
Roman Elizarov6685fd02018-09-25 13:23:53 +03001300) : JobCancellingNode<Job>(parent) {
Vsevolod Tolstopyatovf3a50132018-04-16 19:41:20 +03001301 override fun invoke(cause: Throwable?) {
1302 child.cancel(job.getCancellationException())
1303 }
Roman Elizarovecbc85c2018-09-14 12:52:50 +03001304 override fun toString(): String =
1305 "ChildContinuation[$child]"
Vsevolod Tolstopyatovd5214782018-04-13 14:51:30 +03001306}
1307