blob: 574060e84f869b256760bbc25a2abae5aa8262f5 [file] [log] [blame]
Roman Elizarova7db8ec2017-12-21 22:45:12 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Roman Elizarove1c0b652017-12-01 14:02:57 +030017package kotlinx.coroutines.experimental
18
19import kotlinx.coroutines.experimental.internal.LinkedListHead
20import kotlinx.coroutines.experimental.internal.LinkedListNode
21import kotlin.coroutines.experimental.CoroutineContext
22import kotlin.coroutines.experimental.buildSequence
23import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
24
25/**
26 * A background job. Conceptually, a job is a cancellable thing with a simple life-cycle that
27 * culminates in its completion. Jobs can be arranged into parent-child hierarchies where cancellation
28 * or completion of parent immediately cancels all its [children].
29 *
30 * The most basic instances of [Job] are created with [launch] coroutine builder or with a
31 * `Job()` factory function. Other coroutine builders and primitives like
32 * [Deferred] also implement [Job] interface.
33 *
34 * A job has the following states:
35 *
36 * | **State** | [isActive] | [isCompleted] | [isCancelled] |
37 * | --------------------------------------- | ---------- | ------------- | ------------- |
38 * | _New_ (optional initial state) | `false` | `false` | `false` |
39 * | _Active_ (default initial state) | `true` | `false` | `false` |
40 * | _Completing_ (optional transient state) | `true` | `false` | `false` |
41 * | _Cancelling_ (optional transient state) | `false` | `false` | `true` |
42 * | _Cancelled_ (final state) | `false` | `true` | `true` |
43 * | _Completed_ (final state) | `false` | `true` | `false` |
44 *
45 * Usually, a job is created in _active_ state (it is created and started). However, coroutine builders
46 * that provide an optional `start` parameter create a coroutine in _new_ state when this parameter is set to
47 * [CoroutineStart.LAZY]. Such a job can be made _active_ by invoking [start] or [join].
48 *
49 * A job can be _cancelled_ at any time with [cancel] function that forces it to transition to
Roman Elizarov4d626de2018-01-11 22:57:28 +030050 * _cancelling_ state immediately. Job that is not backed by a coroutine (see `Job()` function) and does not have
Roman Elizarove1c0b652017-12-01 14:02:57 +030051 * [children] becomes _cancelled_ on [cancel] immediately.
52 * Otherwise, job becomes _cancelled_ when it finishes executing its code and
53 * when all its children [complete][isCompleted].
54 *
55 * ```
56 * wait children
57 * +-----+ start +--------+ complete +-------------+ finish +-----------+
58 * | New | ---------------> | Active | -----------> | Completing | -------> | Completed |
59 * +-----+ +--------+ +-------------+ +-----------+
60 * | | |
61 * | cancel | cancel | cancel
62 * V V |
63 * +-----------+ finish +------------+ |
64 * | Cancelled | <--------- | Cancelling | <----------------+
65 * |(completed)| +------------+
66 * +-----------+
67 * ```
68 *
69 * A job in the [coroutineContext][CoroutineScope.coroutineContext] represents the coroutine itself.
70 * A job is active while the coroutine is working and job's cancellation aborts the coroutine when
71 * the coroutine is suspended on a _cancellable_ suspension point by throwing [CancellationException].
72 *
73 * A job can have a _parent_ job. A job with a parent is cancelled when its parent is cancelled or completes exceptionally.
74 * Parent job waits for all its children to complete in _completing_ or _cancelling_ state.
75 * _Completing_ state is purely internal to the job. For an outside observer a _completing_ job is still active,
76 * while internally it is waiting for its children.
77 *
78 * All functions on this interface and on all interfaces derived from it are **thread-safe** and can
79 * be safely invoked from concurrent coroutines without external synchronization.
80 */
81public actual interface Job : CoroutineContext.Element {
82
83 // ------------ state query ------------
84
85 /**
86 * Returns `true` when this job is active -- it was already started and has not completed or cancelled yet.
87 * The job that is waiting for its [children] to complete is still considered to be active if it
88 * was not cancelled.
89 */
90 public actual val isActive: Boolean
91
92 /**
93 * Returns `true` when this job has completed for any reason. A job that was cancelled and has
94 * finished its execution is also considered complete. Job becomes complete only after
95 * all its [children] complete.
96 */
97 public actual val isCompleted: Boolean
98
99 /**
100 * Returns `true` if this job was [cancelled][cancel]. In the general case, it does not imply that the
101 * job has already [completed][isCompleted] (it may still be cancelling whatever it was doing).
102 */
103 public actual val isCancelled: Boolean
104
105 /**
Roman Elizarov2a4fb062017-12-20 11:16:03 +0300106 * Returns [CancellationException] that signals the completion of this job. This function is
107 * used by [cancellable][suspendCancellableCoroutine] suspending functions. They throw exception
108 * returned by this function when they suspend in the context of this job and this job becomes _complete_.
Roman Elizarove1c0b652017-12-01 14:02:57 +0300109 *
Roman Elizarov2a4fb062017-12-20 11:16:03 +0300110 * This function returns the original [cancel] cause of this job if that `cause` was an instance of
111 * [CancellationException]. Otherwise (if this job was cancelled with a cause of a different type, or
112 * was cancelled without a cause, or had completed normally), an instance of [JobCancellationException] is
113 * returned. The [JobCancellationException.cause] of the resulting [JobCancellationException] references
114 * the original cancellation cause that was passed to [cancel] function.
Roman Elizarove1c0b652017-12-01 14:02:57 +0300115 *
Roman Elizarov2a4fb062017-12-20 11:16:03 +0300116 * This function throws [IllegalStateException] when invoked on a job that has not
Roman Elizarove1c0b652017-12-01 14:02:57 +0300117 * [completed][isCompleted] nor [cancelled][isCancelled] yet.
Roman Elizarove1c0b652017-12-01 14:02:57 +0300118 */
119 public actual fun getCancellationException(): CancellationException
120
121 // ------------ state update ------------
122
123 /**
124 * Starts coroutine related to this job (if any) if it was not started yet.
125 * The result `true` if this invocation actually started coroutine or `false`
126 * if it was already started or completed.
127 */
128 public actual fun start(): Boolean
129
130 /**
131 * Cancels this job with an optional cancellation [cause]. The result is `true` if this job was
132 * cancelled as a result of this invocation and `false` otherwise
133 * (if it was already _completed_ or if it is [NonCancellable]).
134 * Repeated invocations of this function have no effect and always produce `false`.
135 *
136 * When cancellation has a clear reason in the code, an instance of [CancellationException] should be created
137 * at the corresponding original cancellation site and passed into this method to aid in debugging by providing
138 * both the context of cancellation and text description of the reason.
139 */
140 public actual fun cancel(cause: Throwable? = null): Boolean
141
142 // ------------ parent-child ------------
143
144 /**
145 * Returns a sequence of this job's children.
146 *
147 * A job becomes a child of this job when it is constructed with this job in its
148 * [CoroutineContext] or using an explicit `parent` parameter.
149 *
150 * A parent-child relation has the following effect:
151 *
152 * * Cancellation of parent with [cancel] or its exceptional completion (failure)
153 * immediately cancels all its children.
154 * * Parent cannot complete until all its children are complete. Parent waits for all its children to
155 * complete in _completing_ or _cancelling_ state.
156 * * Uncaught exception in a child, by default, cancels parent. In particular, this applies to
157 * children created with [launch] coroutine builder. Note, that [async] and other future-like
158 * coroutine builders do not have uncaught exceptions by definition, since all their exceptions are
159 * caught and are encapsulated in their result.
160 */
161 public actual val children: Sequence<Job>
162
163 /**
164 * Attaches child job so that this job becomes its parent and
165 * returns a handle that should be used to detach it.
166 *
167 * A parent-child relation has the following effect:
168 * * Cancellation of parent with [cancel] or its exceptional completion (failure)
169 * immediately cancels all its children.
170 * * Parent cannot complete until all its children are complete. Parent waits for all its children to
171 * complete in _completing_ or _cancelling_ state.
172 *
173 * **A child must store the resulting [DisposableHandle] and [dispose][DisposableHandle.dispose] the attachment
174 * to its parent on its own completion.**
175 *
176 * Coroutine builders and job factory functions that accept `parent` [CoroutineContext] parameter
177 * lookup a [Job] instance in the parent context and use this function to attach themselves as a child.
178 * They also store a reference to the resulting [DisposableHandle] and dispose a handle when they complete.
179 *
180 * @suppress This is an internal API. This method is too error prone for public API.
181 */
182 @Deprecated(message = "Start child coroutine with 'parent' parameter", level = DeprecationLevel.WARNING)
183 public actual fun attachChild(child: Job): DisposableHandle
184
185 // ------------ state waiting ------------
186
187 /**
188 * Suspends coroutine until this job is complete. This invocation resumes normally (without exception)
189 * when the job is complete for any reason and the [Job] of the invoking coroutine is still [active][isActive].
190 * This function also [starts][Job.start] the corresponding coroutine if the [Job] was still in _new_ state.
191 *
192 * Note, that the job becomes complete only when all its children are complete.
193 *
194 * This suspending function is cancellable and **always** checks for the cancellation of invoking coroutine's Job.
195 * If the [Job] of the invoking coroutine is cancelled or completed when this
196 * suspending function is invoked or while it is suspended, this function
197 * throws [CancellationException].
198 *
199 * In particular, it means that a parent coroutine invoking `join` on a child coroutine that was started using
200 * `launch(coroutineContext) { ... }` builder throws [CancellationException] if the child
201 * had crashed, unless a non-standard [CoroutineExceptionHandler] if installed in the context.
202 *
203 * There is [cancelAndJoin] function that combines an invocation of [cancel] and `join`.
204 */
205 public actual suspend fun join()
206
207 // ------------ low-level state-notification ------------
208
209 /**
210 * Registers handler that is **synchronously** invoked once on cancellation or completion of this job.
211 * When job is already cancelling or complete, then the handler is immediately invoked
212 * with a job's cancellation cause or `null` unless [invokeImmediately] is set to false.
213 * Otherwise, handler will be invoked once when this job is cancelled or complete.
214 *
215 * Invocation of this handler on a transition to a transient _cancelling_ state
216 * is controlled by [onCancelling] boolean parameter.
217 * The handler is invoked on invocation of [cancel] when
218 * job becomes _cancelling_ if [onCancelling] parameter is set to `true`. However,
219 * when this [Job] is not backed by a coroutine, like [CompletableDeferred] or [CancellableContinuation]
220 * (both of which do not posses a _cancelling_ state), then the value of [onCancelling] parameter is ignored.
221 *
222 * The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] the
223 * registration of this handler and release its memory if its invocation is no longer needed.
224 * There is no need to dispose the handler after completion of this job. The references to
225 * all the handlers are released when this job completes.
226 *
227 * Installed [handler] should not throw any exceptions. If it does, they will get caught,
228 * wrapped into [CompletionHandlerException], and rethrown, potentially causing crash of unrelated code.
229 *
230 * **Note**: This function is a part of internal machinery that supports parent-child hierarchies
231 * and allows for implementation of suspending functions that wait on the Job's state.
232 * This function should not be used in general application code.
233 * Implementations of `CompletionHandler` must be fast and _lock-free_.
234 *
235 * @param onCancelling when `true`, then the [handler] is invoked as soon as this job transitions to _cancelling_ state;
236 * when `false` then the [handler] is invoked only when it transitions to _completed_ state.
237 * @param invokeImmediately when `true` and this job is already in the desired state (depending on [onCancelling]),
238 * then the [handler] is immediately and synchronously invoked and [NonDisposableHandle] is returned;
239 * when `false` then [NonDisposableHandle] is returned, but the [handler] is not invoked.
240 * @param handler the handler.
241 */
242 public actual fun invokeOnCompletion(
243 onCancelling: Boolean = false,
244 invokeImmediately: Boolean = true,
245 handler: CompletionHandler): DisposableHandle
246
247 /**
248 * Key for [Job] instance in the coroutine context.
249 */
250 public actual companion object Key : CoroutineContext.Key<Job>
251}
252
253/**
254 * Creates a new job object in an _active_ state.
255 * It is optionally a child of a [parent] job.
256 */
257@Suppress("FunctionName")
258public actual fun Job(parent: Job? = null): Job = JobImpl(parent)
259
260/**
261 * A handle to an allocated object that can be disposed to make it eligible for garbage collection.
262 */
263public actual interface DisposableHandle {
264 /**
265 * Disposes the corresponding object, making it eligible for garbage collection.
266 * Repeated invocation of this function has no effect.
267 */
268 public actual fun dispose()
269}
270
Roman Elizarovd164f732017-12-25 17:05:48 +0300271// -------------------- CoroutineContext extensions --------------------
272
273/**
274 * Cancels [Job] of this context with an optional cancellation [cause]. The result is `true` if the job was
275 * cancelled as a result of this invocation and `false` if there is no job in the context or if it was already
276 * cancelled or completed. See [Job.cancel] for details.
277 */
278public actual fun CoroutineContext.cancel(cause: Throwable? = null): Boolean =
279 this[Job]?.cancel(cause) ?: false
280
281/**
282 * Cancels all children of the [Job] in this context with an optional cancellation [cause].
283 * It does not do anything if there is no job in the context or it has no children.
284 * See [Job.cancelChildren] for details.
285 */
286public actual fun CoroutineContext.cancelChildren(cause: Throwable? = null) {
287 this[Job]?.cancelChildren(cause)
288}
289
Roman Elizarove1c0b652017-12-01 14:02:57 +0300290// -------------------- Job extensions --------------------
291
292/**
293 * Disposes a specified [handle] when this job is complete.
294 *
295 * This is a shortcut for the following code:
296 * ```
297 * invokeOnCompletion { handle.dispose() }
298 * ```
299 */
300public actual fun Job.disposeOnCompletion(handle: DisposableHandle): DisposableHandle =
301 invokeOnCompletion { handle.dispose() }
302
303/**
304 * Cancels the job and suspends invoking coroutine until the cancelled job is complete.
305 *
306 * This suspending function is cancellable and **always** checks for the cancellation of invoking coroutine's Job.
307 * If the [Job] of the invoking coroutine is cancelled or completed when this
308 * suspending function is invoked or while it is suspended, this function
309 * throws [CancellationException].
310 *
311 * In particular, it means that a parent coroutine invoking `cancelAndJoin` on a child coroutine that was started using
312 * `launch(coroutineContext) { ... }` builder throws [CancellationException] if the child
313 * had crashed, unless a non-standard [CoroutineExceptionHandler] if installed in the context.
314 *
315 * This is a shortcut for the invocation of [cancel][Job.cancel] followed by [join][Job.join].
316 */
317public actual suspend fun Job.cancelAndJoin() {
318 cancel()
319 return join()
320}
321
322/**
323 * Cancels all [children][Job.children] jobs of this coroutine with the given [cause] using [Job.cancel]
324 * for all of them. Unlike [Job.cancel] on this job as a whole, the state of this job itself is not affected.
325 */
326public actual fun Job.cancelChildren(cause: Throwable? = null) {
327 children.forEach { it.cancel(cause) }
328}
329
330/**
331 * Suspends coroutine until all [children][Job.children] of this job are complete using
332 * [Job.join] for all of them. Unlike [Job.join] on this job as a whole, it does not wait until
333 * this job is complete.
334 */
335public actual suspend fun Job.joinChildren() {
336 children.forEach { it.join() }
337}
338
339/**
340 * No-op implementation of [DisposableHandle].
341 */
342public actual object NonDisposableHandle : DisposableHandle {
343 /** Does not do anything. */
344 actual override fun dispose() {}
345
346 /** Returns "NonDisposableHandle" string. */
347 override fun toString(): String = "NonDisposableHandle"
348}
349
350// --------------- helper classes to simplify job implementation
351
Roman Elizarov45c1a732017-12-21 11:53:31 +0300352
Roman Elizarove1c0b652017-12-01 14:02:57 +0300353/**
354 * A concrete implementation of [Job]. It is optionally a child to a parent job.
355 * This job is cancelled when the parent is complete, but not vise-versa.
356 *
357 * This is an open class designed for extension by more specific classes that might augment the
358 * state and mare store addition state information for completed jobs, like their result values.
359 *
360 * @param active when `true` the job is created in _active_ state, when `false` in _new_ state. See [Job] for details.
361 * @suppress **This is unstable API and it is subject to change.**
362 */
363public open class JobSupport(active: Boolean) : Job {
364 override val key: CoroutineContext.Key<*> get() = Job
365
366 // Note: use shared objects while we have no listeners
367 protected var state: Any? = if (active) EmptyActive else EmptyNew
368 private set
369
370 private var parentHandle: DisposableHandle? = null
371
372 // ------------ initialization ------------
373
374 /**
375 * Initializes parent job.
376 * It shall be invoked at most once after construction after all other initialization.
377 */
378 public fun initParentJob(parent: Job?) {
Roman Elizarov9d5abcd2017-12-21 16:54:30 +0300379 check(parentHandle == null) { "Shall be invoked at most once" }
Roman Elizarove1c0b652017-12-01 14:02:57 +0300380 if (parent == null) {
381 parentHandle = NonDisposableHandle
382 return
383 }
384 parent.start() // make sure the parent is started
385 @Suppress("DEPRECATION")
386 val handle = parent.attachChild(this)
387 parentHandle = handle
388 // now check our state _after_ registering (see updateState order of actions)
389 if (isCompleted) {
390 handle.dispose()
391 parentHandle = NonDisposableHandle // release it just in case, to aid GC
392 }
393 }
394
395 // ------------ state query ------------
396
397 public final override val isActive: Boolean get() {
398 val state = this.state
399 return state is Incomplete && state.isActive
400 }
401
402 public final override val isCompleted: Boolean get() = state !is Incomplete
403
404 public final override val isCancelled: Boolean get() {
405 val state = this.state
406 return state is Cancelled || (state is Finishing && state.cancelled != null)
407 }
408
409 // ------------ state update ------------
410
411 /**
412 * Updates current [state] of this job.
413 * @suppress **This is unstable API and it is subject to change.**
414 */
415 internal fun updateState(proposedUpdate: Any?, mode: Int) {
416 val state = this.state as Incomplete // current state must be incomplete
417 val update = coerceProposedUpdate(state, proposedUpdate)
418 tryUpdateState(update)
419 completeUpdateState(state, update, mode)
420 }
421
422 internal fun tryUpdateState(update: Any?) {
423 require(update !is Incomplete) // only incomplete -> completed transition is allowed
424 this.state = update
425 // Unregister from parent job
426 parentHandle?.let {
427 it.dispose()
428 parentHandle = NonDisposableHandle // release it just in case, to aid GC
429 }
430 }
431
432 // when Job is in Cancelling state, it can only be promoted to Cancelled state,
433 // so if the proposed Update is not an appropriate Cancelled (preserving the cancellation cause),
434 // then the corresponding Cancelled state is constructed.
435 private fun coerceProposedUpdate(expect: Incomplete, proposedUpdate: Any?): Any? =
436 if (expect is Finishing && expect.cancelled != null && !isCorrespondinglyCancelled(expect.cancelled, proposedUpdate))
437 createCancelled(expect.cancelled, proposedUpdate) else proposedUpdate
438
439 private fun isCorrespondinglyCancelled(cancelled: Cancelled, proposedUpdate: Any?): Boolean {
440 if (proposedUpdate !is Cancelled) return false
441 // NOTE: equality comparison of causes is performed here by design, see equals of JobCancellationException
442 return proposedUpdate.cause == cancelled.cause ||
443 proposedUpdate.cause is JobCancellationException && cancelled.cause == null
444 }
445
446 private fun createCancelled(cancelled: Cancelled, proposedUpdate: Any?): Cancelled {
447 if (proposedUpdate !is CompletedExceptionally) return cancelled // not exception -- just use original cancelled
448 val exception = proposedUpdate.exception
449 if (cancelled.exception == exception) return cancelled // that is the cancelled we need already!
450 //cancelled.cause?.let { exception.addSuppressed(it) }
451 return Cancelled(this, exception)
452 }
453
454 /**
455 * Completes update of the current [state] of this job.
456 * @suppress **This is unstable API and it is subject to change.**
457 */
458 internal fun completeUpdateState(expect: Incomplete, update: Any?, mode: Int) {
459 // Invoke completion handlers
460 val exceptionally = update as? CompletedExceptionally
461 val cause = exceptionally?.cause
462 if (expect is JobNode<*>) { // SINGLE/SINGLE+ state -- one completion handler (common case)
463 try {
464 expect.invoke(cause)
465 } catch (ex: Throwable) {
466 handleException(CompletionHandlerException("Exception in completion handler $expect for $this", ex))
467 }
468 } else {
469 expect.list?.notifyCompletion(cause)
470 }
471 // Do overridable processing after completion handlers
472 if (!expect.isCancelling) onCancellation(exceptionally) // only notify when was not cancelling before
473 afterCompletion(update, mode)
474 }
475
476 private inline fun <reified T: JobNode<*>> notifyHandlers(list: NodeList, cause: Throwable?) {
477 var exception: Throwable? = null
478 list.forEach<T> { node ->
479 try {
480 node.invoke(cause)
481 } catch (ex: Throwable) {
482 exception?.apply { /* addSuppressed(ex) */ } ?: run {
483 exception = CompletionHandlerException("Exception in completion handler $node for $this", ex)
484 }
485 }
486 }
487 exception?.let { handleException(it) }
488 }
489
490 private fun NodeList.notifyCompletion(cause: Throwable?) =
491 notifyHandlers<JobNode<*>>(this, cause)
492
493 private fun notifyCancellation(list: NodeList, cause: Throwable?) =
494 notifyHandlers<JobCancellationNode<*>>(list, cause)
495
496 public final override fun start(): Boolean {
497 val state = this.state
498 when (state) {
499 is Empty -> { // EMPTY_X state -- no completion handlers
500 if (state.isActive) return false // already active
Roman Elizarov9d5abcd2017-12-21 16:54:30 +0300501 this.state = EmptyActive
Roman Elizarove1c0b652017-12-01 14:02:57 +0300502 onStart()
503 return true
504 }
505 is NodeList -> { // LIST -- a list of completion handlers (either new or active)
506 return state.makeActive().also { result ->
507 if (result) onStart()
508 }
509 }
510 else -> return false // not a new state
511 }
512 }
513
514 /**
515 * Override to provide the actual [start] action.
516 */
517 protected open fun onStart() {}
518
519 public final override fun getCancellationException(): CancellationException {
520 val state = this.state
521 return when {
522 state is Finishing && state.cancelled != null ->
523 state.cancelled.exception.toCancellationException("Job is being cancelled")
524 state is Incomplete ->
525 error("Job was not completed or cancelled yet: $this")
526 state is CompletedExceptionally ->
527 state.exception.toCancellationException("Job has failed")
528 else -> JobCancellationException("Job has completed normally", null, this)
529 }
530 }
531
532 private fun Throwable.toCancellationException(message: String): CancellationException =
533 this as? CancellationException ?: JobCancellationException(message, this, this@JobSupport)
534
535 /**
536 * Returns the cause that signals the completion of this job -- it returns the original
537 * [cancel] cause or **`null` if this job had completed
538 * normally or was cancelled without a cause**. This function throws
539 * [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor
540 * [isCancelled] yet.
541 */
542 protected fun getCompletionCause(): Throwable? {
543 val state = this.state
544 return when {
545 state is Finishing && state.cancelled != null -> state.cancelled.cause
546 state is Incomplete -> error("Job was not completed or cancelled yet")
547 state is CompletedExceptionally -> state.cause
548 else -> null
549 }
550 }
551
552 public final override fun invokeOnCompletion(onCancelling: Boolean, invokeImmediately: Boolean, handler: CompletionHandler) =
553 installNode(onCancelling, invokeImmediately, makeNode(handler, onCancelling))
554
555 private fun installNode(
556 onCancelling: Boolean,
557 invokeImmediately: Boolean,
558 node: JobNode<*>
559 ): DisposableHandle {
560 while (true) {
561 val state = this.state
562 when (state) {
563 is Empty -> { // EMPTY_X state -- no completion handlers
564 if (state.isActive) {
565 // move to SINGLE state
566 this.state = node
567 return node
568 } else
569 promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
570 }
571 is Incomplete -> {
572 val list = state.list
573 if (list == null) { // SINGLE/SINGLE+
574 promoteSingleToNodeList(state as JobNode<*>)
575 } else {
576 if (state is Finishing && state.cancelled != null && onCancelling) {
Roman Elizarov9d5abcd2017-12-21 16:54:30 +0300577 // cannot be in this state unless were support cancelling state
Roman Elizarov4d626de2018-01-11 22:57:28 +0300578 check(onCancelMode != ON_CANCEL_MAKE_CANCELLED) // cannot be in this state unless were support cancelling state
Roman Elizarove1c0b652017-12-01 14:02:57 +0300579 // installing cancellation handler on job that is being cancelled
580 if (invokeImmediately) node.invoke(state.cancelled.cause)
581 return NonDisposableHandle
582 }
583 list.addLast(node)
584 return node
585 }
586 }
587 else -> { // is complete
588 if (invokeImmediately) node.invoke((state as? CompletedExceptionally)?.cause)
589 return NonDisposableHandle
590 }
591 }
592 }
593 }
594
Roman Elizarov4d626de2018-01-11 22:57:28 +0300595 private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode<*> {
596 val hasCancellingState = onCancelMode != ON_CANCEL_MAKE_CANCELLED
597 return if (onCancelling && hasCancellingState)
Roman Elizarove1c0b652017-12-01 14:02:57 +0300598 InvokeOnCancellation(this, handler)
599 else
600 InvokeOnCompletion(this, handler)
Roman Elizarov4d626de2018-01-11 22:57:28 +0300601 }
Roman Elizarove1c0b652017-12-01 14:02:57 +0300602
603
604 private fun promoteEmptyToNodeList(state: Empty) {
Roman Elizarov9d5abcd2017-12-21 16:54:30 +0300605 check(state === this.state) { "Expected empty state"}
Roman Elizarove1c0b652017-12-01 14:02:57 +0300606 // promote it to list in new state
607 this.state = NodeList(state.isActive)
608 }
609
610 private fun promoteSingleToNodeList(state: JobNode<*>) {
Roman Elizarov9d5abcd2017-12-21 16:54:30 +0300611 check(state === this.state) { "Expected single state" }
Roman Elizarove1c0b652017-12-01 14:02:57 +0300612 // promote it to list (SINGLE+ state)
613 val list = NodeList(isActive = true)
614 list.addLast(state)
615 this.state = list
616 }
617
618 final override suspend fun join() {
619 if (!joinInternal()) { // fast-path no wait
620 return suspendCoroutineOrReturn { cont ->
621 cont.context.checkCompletion()
622 Unit // do not suspend
623 }
624 }
625 return joinSuspend() // slow-path wait
626 }
627
628 private fun joinInternal(): Boolean {
629 if (state !is Incomplete) return false // not active anymore (complete) -- no need to wait
630 start()
631 return true // wait
632 }
633
634 private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
635 val handle = invokeOnCompletion { cont.resume(Unit) }
636 cont.invokeOnCompletion { handle.dispose() }
637 }
638
639 internal fun removeNode(node: JobNode<*>) {
640 // remove logic depends on the state of the job
641 val state = this.state
642 when (state) {
643 is JobNode<*> -> { // SINGE/SINGLE+ state -- one completion handler
644 if (state !== node) return // a different job node --> we were already removed
645 // remove and revert back to empty state
646 this.state = EmptyActive
647 }
648 is Incomplete -> { // may have a list of completion handlers
649 // remove node from the list if there is a list
650 if (state.list != null) node.remove()
651 }
652 }
653 }
654
Roman Elizarov4d626de2018-01-11 22:57:28 +0300655 protected open val onCancelMode: Int get() = ON_CANCEL_MAKE_CANCELLING
Roman Elizarove1c0b652017-12-01 14:02:57 +0300656
Roman Elizarov4d626de2018-01-11 22:57:28 +0300657 public override fun cancel(cause: Throwable?): Boolean = when (onCancelMode) {
658 ON_CANCEL_MAKE_CANCELLED -> makeCancelled(cause)
659 ON_CANCEL_MAKE_CANCELLING -> makeCancelling(cause)
660 ON_CANCEL_MAKE_COMPLETING -> makeCompletingOnCancel(cause)
661 else -> error("Invalid onCancelMode $onCancelMode")
662 }
Roman Elizarove1c0b652017-12-01 14:02:57 +0300663
664 // we will be dispatching coroutine to process its cancellation exception, so there is no need for
665 // an extra check for Job status in MODE_CANCELLABLE
666 private fun updateStateCancelled(cause: Throwable?) =
667 updateState(Cancelled(this, cause), mode = MODE_ATOMIC_DEFAULT)
668
669 // transitions to Cancelled state
670 private fun makeCancelled(cause: Throwable?): Boolean {
671 if (state !is Incomplete) return false // quit if already complete
672 updateStateCancelled(cause)
673 return true
674 }
675
676 // transitions to Cancelling state
677 private fun makeCancelling(cause: Throwable?): Boolean {
678 while (true) {
679 val state = this.state
680 when (state) {
681 is Empty -> { // EMPTY_X state -- no completion handlers
682 if (state.isActive) {
683 promoteEmptyToNodeList(state) // this way can wrap it into Cancelling on next pass
684 } else {
685 // cancelling a non-started coroutine makes it immediately cancelled
686 // (and we have no listeners to notify which makes it very simple)
687 updateStateCancelled(cause)
688 return true
689 }
690 }
691 is JobNode<*> -> { // SINGLE/SINGLE+ state -- one completion handler
692 promoteSingleToNodeList(state)
693 }
694 is NodeList -> { // LIST -- a list of completion handlers (either new or active)
695 if (state.isActive) {
696 makeCancellingList(state.list, cause)
697 return true
698 } else {
699 // cancelling a non-started coroutine makes it immediately cancelled
700 updateStateCancelled(cause)
701 return true
702 }
703 }
704 is Finishing -> { // Completing/Cancelling the job, may cancel
705 if (state.cancelled != null) return false // already cancelling
706 makeCancellingList(state.list, cause)
707 return true
708 }
709 else -> { // is inactive
710 return false
711 }
712 }
713 }
714 }
715
716 // make expected state in cancelling
717 private fun makeCancellingList(list: NodeList, cause: Throwable?) {
718 val cancelled = Cancelled(this, cause)
719 state = Finishing(list, cancelled, false)
720 notifyCancellation(list, cause)
721 onCancellation(cancelled)
722 }
723
Roman Elizarov4d626de2018-01-11 22:57:28 +0300724 private fun makeCompletingOnCancel(cause: Throwable?): Boolean =
725 makeCompleting(Cancelled(this, cause))
726
727 internal fun makeCompleting(proposedUpdate: Any?): Boolean =
728 when (makeCompletingInternal(proposedUpdate, mode = MODE_ATOMIC_DEFAULT)) {
729 COMPLETING_ALREADY_COMPLETING -> false
730 else -> true
731 }
732
Roman Elizarove1c0b652017-12-01 14:02:57 +0300733 /**
734 * Returns:
735 * * `true` if state was updated to completed/cancelled;
736 * * `false` if made completing or it is cancelling and is waiting for children.
737 *
738 * @throws IllegalStateException if job is already complete or completing
739 * @suppress **This is unstable API and it is subject to change.**
740 */
Roman Elizarov4d626de2018-01-11 22:57:28 +0300741 internal fun makeCompletingOnce(proposedUpdate: Any?, mode: Int): Boolean =
742 when (makeCompletingInternal(proposedUpdate, mode)) {
743 COMPLETING_COMPLETED -> true
744 COMPLETING_WAITING_CHILDREN -> false
745 else -> throw IllegalStateException("Job $this is already complete or completing, " +
746 "but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull)
747 }
748
749 private fun makeCompletingInternal(proposedUpdate: Any?, mode: Int): Int {
Roman Elizarove1c0b652017-12-01 14:02:57 +0300750 loop@ while (true) {
751 val state = this.state
752 @Suppress("FoldInitializerAndIfToElvis")
753 if (state !is Incomplete)
Roman Elizarov4d626de2018-01-11 22:57:28 +0300754 return COMPLETING_ALREADY_COMPLETING
Roman Elizarove1c0b652017-12-01 14:02:57 +0300755 if (state is Finishing && state.completing)
Roman Elizarov4d626de2018-01-11 22:57:28 +0300756 return COMPLETING_ALREADY_COMPLETING
Roman Elizarove1c0b652017-12-01 14:02:57 +0300757 val child: Child = firstChild(state) ?: run {
758 // or else complete immediately w/o children
759 updateState(proposedUpdate, mode)
Roman Elizarov4d626de2018-01-11 22:57:28 +0300760 return COMPLETING_COMPLETED
Roman Elizarove1c0b652017-12-01 14:02:57 +0300761 }
762 // must promote to list to correct operate on child lists
763 if (state is JobNode<*>) {
764 promoteSingleToNodeList(state)
Roman Elizarov45c1a732017-12-21 11:53:31 +0300765 continue@loop // retry
Roman Elizarove1c0b652017-12-01 14:02:57 +0300766 }
767 // cancel all children in list on exceptional completion
768 if (proposedUpdate is CompletedExceptionally)
769 child.cancelChildrenInternal(proposedUpdate.exception)
770 // switch to completing state
771 val completing = Finishing(state.list!!, (state as? Finishing)?.cancelled, true)
772 this.state = completing
773 if (tryWaitForChild(child, proposedUpdate))
Roman Elizarov4d626de2018-01-11 22:57:28 +0300774 return COMPLETING_WAITING_CHILDREN
775 updateState(proposedUpdate, mode = MODE_ATOMIC_DEFAULT)
776 return COMPLETING_COMPLETED
Roman Elizarove1c0b652017-12-01 14:02:57 +0300777 }
778 }
779
780 private tailrec fun Child.cancelChildrenInternal(cause: Throwable) {
781 childJob.cancel(JobCancellationException("Child job was cancelled because of parent failure", cause, childJob))
782 nextChild()?.cancelChildrenInternal(cause)
783 }
784
785 private val Any?.exceptionOrNull: Throwable?
786 get() = (this as? CompletedExceptionally)?.exception
787
788 private fun firstChild(state: Incomplete) =
789 state as? Child ?: state.list?.nextChild()
790
791 // return false when there is no more incomplete children to wait
792 private tailrec fun tryWaitForChild(child: Child, proposedUpdate: Any?): Boolean {
793 val handle = child.childJob.invokeOnCompletion(invokeImmediately = false) {
794 continueCompleting(child, proposedUpdate)
795 }
796 if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
797 val nextChild = child.nextChild() ?: return false
798 return tryWaitForChild(nextChild, proposedUpdate)
799 }
800
801 internal fun continueCompleting(lastChild: Child, proposedUpdate: Any?) {
802 val state = this.state
803 @Suppress("FoldInitializerAndIfToElvis")
804 if (state !is Finishing)
805 throw IllegalStateException("Job $this is found in expected state while completing with $proposedUpdate", proposedUpdate.exceptionOrNull)
806 // figure out if we need to wait for next child
807 val waitChild = lastChild.nextChild()
808 // try wait for next child
809 if (waitChild != null && tryWaitForChild(waitChild, proposedUpdate)) return // waiting for next child
810 // no more children to wait -- update state
Roman Elizarov4d626de2018-01-11 22:57:28 +0300811 updateState(proposedUpdate, mode = MODE_ATOMIC_DEFAULT)
Roman Elizarove1c0b652017-12-01 14:02:57 +0300812 }
813
814 private fun LinkedListNode.nextChild(): Child? {
815 var cur = this
816 while (cur.isRemoved) cur = cur.prev // rollback to prev non-removed (or list head)
817 while (true) {
818 cur = cur.next
819 if (cur is Child) return cur
820 if (cur is NodeList) return null // checked all -- no more children
821 }
822 }
823
824 override val children: Sequence<Job> get() = buildSequence<Job> {
825 val state = this@JobSupport.state
826 when (state) {
827 is Child -> yield(state.childJob)
828 is Incomplete -> state.list?.let { list ->
829 list.forEach<Child> { yield(it.childJob) }
830 }
831 }
832 }
833
834 @Suppress("OverridingDeprecatedMember")
835 override fun attachChild(child: Job): DisposableHandle =
836 installNode(onCancelling = true, invokeImmediately = true, node = Child(this, child))
837
838 /**
839 * Override to process any exceptions that were encountered while invoking completion handlers
840 * installed via [invokeOnCompletion].
841 */
842 protected open fun handleException(exception: Throwable) {
843 throw exception
844 }
845
846 /**
847 * It is invoked once when job is cancelled or is completed, similarly to [invokeOnCompletion] with
848 * `onCancelling` set to `true`.
849 * @param exceptionally not null when the the job was cancelled or completed exceptionally,
850 * null when it has completed normally.
851 * @suppress **This is unstable API and it is subject to change.**
852 */
853 protected open fun onCancellation(exceptionally: CompletedExceptionally?) {}
854
855 /**
856 * Override for post-completion actions that need to do something with the state.
857 * @param mode completion mode.
858 * @suppress **This is unstable API and it is subject to change.**
859 */
860 protected open fun afterCompletion(state: Any?, mode: Int) {}
861
862 // for nicer debugging
Roman Elizarov45c1a732017-12-21 11:53:31 +0300863 override fun toString(): String = "Job{${stateString()}}"
Roman Elizarove1c0b652017-12-01 14:02:57 +0300864
Roman Elizarov45c1a732017-12-21 11:53:31 +0300865 protected fun stateString(): String {
Roman Elizarove1c0b652017-12-01 14:02:57 +0300866 val state = this.state
867 return when (state) {
868 is Finishing -> buildString {
869 if (state.cancelled != null) append("Cancelling")
870 if (state.completing) append("Completing")
871 }
872 is Incomplete -> if (state.isActive) "Active" else "New"
873 is Cancelled -> "Cancelled"
874 is CompletedExceptionally -> "CompletedExceptionally"
875 else -> "Completed"
876 }
877 }
878
879 /**
880 * @suppress **This is unstable API and it is subject to change.**
881 */
882 internal interface Incomplete {
883 val isActive: Boolean
884 val list: NodeList? // is null only for Empty and JobNode incomplete state objects
885 }
886
887 // Cancelling or Completing
888 private class Finishing(
889 override val list: NodeList,
890 val cancelled: Cancelled?, /* != null when cancelling */
891 val completing: Boolean /* true when completing */
892 ) : Incomplete {
893 override val isActive: Boolean get() = cancelled == null
894 }
895
896 private val Incomplete.isCancelling: Boolean
897 get() = this is Finishing && cancelled != null
898
899 /**
900 * @suppress **This is unstable API and it is subject to change.**
901 */
902 internal class NodeList(
903 override var isActive: Boolean
904 ) : LinkedListHead(), Incomplete {
905 override val list: NodeList get() = this
906
907 fun makeActive(): Boolean {
908 if (isActive) return false
909 isActive = true
910 return true
911 }
912
913 override fun toString(): String = buildString {
914 append("List")
915 append(if (isActive) "{Active}" else "{New}")
916 append("[")
917 var first = true
918 this@NodeList.forEach<JobNode<*>> { node ->
919 if (first) first = false else append(", ")
920 append(node)
921 }
922 append("]")
923 }
924 }
925
Roman Elizarove1c0b652017-12-01 14:02:57 +0300926 /*
927 * =================================================================================================
928 * This is ready-to-use implementation for Deferred interface.
929 * However, it is not type-safe. Conceptually it just exposes the value of the underlying
930 * completed state as `Any?`
931 * =================================================================================================
932 */
933
934 public val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally
935
936 public fun getCompletionExceptionOrNull(): Throwable? {
937 val state = this.state
938 check(state !is Incomplete) { "This job has not completed yet" }
939 return state.exceptionOrNull
940 }
941
942 protected fun getCompletedInternal(): Any? {
943 val state = this.state
944 check(state !is Incomplete) { "This job has not completed yet" }
945 if (state is CompletedExceptionally) throw state.exception
946 return state
947 }
948
949 protected suspend fun awaitInternal(): Any? {
950 val state = this.state
951 if (state !is Incomplete) {
952 // already complete -- just return result
953 if (state is CompletedExceptionally) throw state.exception
954 return state
955 }
956 start()
957 return awaitSuspend() // slow-path
958 }
959
960 private suspend fun awaitSuspend(): Any? = suspendCancellableCoroutine { cont ->
961 val handle = invokeOnCompletion {
962 val state = this.state
Roman Elizarov9d5abcd2017-12-21 16:54:30 +0300963 check(state !is Incomplete) { "State should be complete "}
Roman Elizarove1c0b652017-12-01 14:02:57 +0300964 if (state is CompletedExceptionally)
965 cont.resumeWithException(state.exception)
966 else
967 cont.resume(state)
968 }
969 cont.invokeOnCompletion { handle.dispose() }
970 }
971}
972
Roman Elizarov4d626de2018-01-11 22:57:28 +0300973internal const val ON_CANCEL_MAKE_CANCELLED = 0
974internal const val ON_CANCEL_MAKE_CANCELLING = 1
975internal const val ON_CANCEL_MAKE_COMPLETING = 2
976
977private const val COMPLETING_ALREADY_COMPLETING = 0
978private const val COMPLETING_COMPLETED = 1
979private const val COMPLETING_WAITING_CHILDREN = 2
980
Roman Elizarove1c0b652017-12-01 14:02:57 +0300981@Suppress("PrivatePropertyName")
982private val EmptyNew = Empty(false)
983@Suppress("PrivatePropertyName")
984private val EmptyActive = Empty(true)
985
986private class Empty(override val isActive: Boolean) : JobSupport.Incomplete {
987 override val list: JobSupport.NodeList? get() = null
988 override fun toString(): String = "Empty{${if (isActive) "Active" else "New" }}"
989}
990
991private class JobImpl(parent: Job? = null) : JobSupport(true) {
992 init { initParentJob(parent) }
Roman Elizarov4d626de2018-01-11 22:57:28 +0300993 override val onCancelMode: Int get() = ON_CANCEL_MAKE_COMPLETING
Roman Elizarove1c0b652017-12-01 14:02:57 +0300994}
995
996// -------- invokeOnCompletion nodes
997
998internal abstract class JobNode<out J : Job>(
999 val job: J
1000) : LinkedListNode(), DisposableHandle, JobSupport.Incomplete {
1001 final override val isActive: Boolean get() = true
1002 final override val list: JobSupport.NodeList? get() = null
1003 final override fun dispose() = (job as JobSupport).removeNode(this)
1004 abstract fun invoke(reason: Throwable?) // CompletionHandler -- invoked on completion
1005}
1006
1007private class InvokeOnCompletion(
1008 job: Job,
1009 private val handler: CompletionHandler
1010) : JobNode<Job>(job) {
1011 override fun invoke(reason: Throwable?) = handler.invoke(reason)
1012 override fun toString() = "InvokeOnCompletion"
1013}
1014
1015// -------- invokeOnCancellation nodes
1016
1017/**
1018 * Marker for node that shall be invoked on cancellation (in _cancelling_ state).
1019 * **Note: may be invoked multiple times during cancellation.**
1020 */
1021internal abstract class JobCancellationNode<out J : Job>(job: J) : JobNode<J>(job)
1022
1023private class InvokeOnCancellation(
1024 job: Job,
1025 private val handler: CompletionHandler
1026) : JobCancellationNode<Job>(job) {
1027 // delegate handler shall be invoked at most once, so here is an additional flag
1028 private var invoked = false
1029 override fun invoke(reason: Throwable?) {
1030 if (invoked) return
1031 invoked = true
1032 handler.invoke(reason)
1033 }
1034 override fun toString() = "InvokeOnCancellation"
1035}
1036
1037internal class Child(
1038 parent: JobSupport,
1039 val childJob: Job
1040) : JobCancellationNode<JobSupport>(parent) {
1041 override fun invoke(reason: Throwable?) {
1042 // Always materialize the actual instance of parent's completion exception and cancel child with it
1043 childJob.cancel(job.getCancellationException())
1044 }
1045 override fun toString(): String = "Child[$childJob]"
1046}
1047