blob: 87f7305c4482a08c7a2f63e2ceb597d4245c6dfb [file] [log] [blame]
Roman Elizarov3754f952017-01-18 20:47:54 +03001package kotlinx.coroutines.experimental
2
Roman Elizarova5676592017-01-19 10:31:13 +03003import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
4import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
Roman Elizarov3754f952017-01-18 20:47:54 +03005import java.util.concurrent.Future
6import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
7import kotlin.coroutines.AbstractCoroutineContextElement
8import kotlin.coroutines.Continuation
9import kotlin.coroutines.CoroutineContext
10
11// --------------- core job interfaces ---------------
12
13/**
14 * A background job. It has two states: _active_ (initial state) and _completed_ (final state).
Roman Elizarov44ba4b12017-01-25 11:37:54 +030015 *
16 * A job can be _cancelled_ at any time with [cancel] function that forces it to become completed immediately.
17 * A job in the coroutine [context][CoroutineScope.context] represents the coroutine itself.
Roman Elizarov3754f952017-01-18 20:47:54 +030018 * A job is active while the coroutine is working and job's cancellation aborts the coroutine when
19 * the coroutine is suspended on a _cancellable_ suspension point by throwing [CancellationException]
20 * inside the coroutine.
21 *
Roman Elizarov44ba4b12017-01-25 11:37:54 +030022 * A job can have a _parent_. A job with a parent is cancelled when its parent completes.
Roman Elizarov3754f952017-01-18 20:47:54 +030023 *
24 * All functions on this interface are thread-safe.
25 */
26public interface Job : CoroutineContext.Element {
Roman Elizarovd528e3e2017-01-23 15:40:05 +030027 /**
28 * Key for [Job] instance in the coroutine context.
29 */
Roman Elizarov3754f952017-01-18 20:47:54 +030030 public companion object Key : CoroutineContext.Key<Job> {
31 /**
32 * Creates new job object. It is optionally a child of a [parent] job.
33 */
Roman Elizarov58a7add2017-01-20 12:19:52 +030034 public operator fun invoke(parent: Job? = null): Job = JobImpl(parent)
Roman Elizarov3754f952017-01-18 20:47:54 +030035 }
36
37 /**
38 * Returns `true` when job is still active.
39 */
40 public val isActive: Boolean
41
42 /**
43 * Registers completion handler. The action depends on the state of this job.
44 * When job is cancelled with [cancel], then the handler is immediately invoked
45 * with a cancellation reason. Otherwise, handler will be invoked once when this
46 * job is complete (cancellation also is a form of completion).
47 * The resulting [Registration] can be used to [Registration.unregister] if this
48 * registration is no longer needed. There is no need to unregister after completion.
49 */
50 public fun onCompletion(handler: CompletionHandler): Registration
51
52 /**
53 * Cancel this activity with an optional cancellation [reason]. The result is `true` if this job was
54 * cancelled as a result of this invocation and `false` otherwise (if it was already cancelled).
55 * When cancellation has a clear reason in the code, an instance of [CancellationException] should be created
56 * at the corresponding original cancellation site and passed into this method to aid in debugging by providing
57 * both the context of cancellation and text description of the reason.
58 */
59 public fun cancel(reason: Throwable? = null): Boolean
60
61 /**
62 * Registration object for [onCompletion]. It can be used to [unregister] if needed.
63 * There is no need to unregister after completion.
64 */
65 public interface Registration {
66 /**
67 * Unregisters completion handler.
68 */
69 public fun unregister()
70 }
71}
72
Roman Elizarov58a7add2017-01-20 12:19:52 +030073public typealias CompletionHandler = (Throwable?) -> Unit
Roman Elizarov3754f952017-01-18 20:47:54 +030074
Roman Elizarov58a7add2017-01-20 12:19:52 +030075/**
76 * Thrown by cancellable suspending functions if the [Job] of the coroutine is cancelled while it is suspending.
77 */
78public typealias CancellationException = java.util.concurrent.CancellationException
Roman Elizarov3754f952017-01-18 20:47:54 +030079
80/**
81 * Unregisters a specified [registration] when this job is complete.
82 * This is a shortcut for the following code with slightly more efficient implementation (one fewer object created).
83 * ```
84 * onCompletion { registration.unregister() }
85 * ```
86 */
87public fun Job.unregisterOnCompletion(registration: Job.Registration): Job.Registration =
88 onCompletion(UnregisterOnCompletion(this, registration))
89
90/**
91 * Cancels a specified [future] when this job is complete.
92 * This is a shortcut for the following code with slightly more efficient implementation (one fewer object created).
93 * ```
94 * onCompletion { future.cancel(true) }
95 * ```
96 */
97public fun Job.cancelFutureOnCompletion(future: Future<*>): Job.Registration =
98 onCompletion(CancelFutureOnCompletion(this, future))
99
Roman Elizarov53a0a402017-01-19 20:21:57 +0300100internal fun Job.removeOnCompletion(node: LockFreeLinkedListNode): Job.Registration =
101 onCompletion(RemoveOnCompletion(this, node))
102
Roman Elizarov3754f952017-01-18 20:47:54 +0300103/**
104 * Suspends coroutine until this job is complete. This invocation resumes normally (without exception)
105 * when the job is complete for any reason.
106 *
107 * This suspending function is cancellable. If the [Job] of the invoking coroutine is completed while this
108 * suspending function is suspended, this function immediately resumes with [CancellationException].
109 */
110public suspend fun Job.join() {
111 if (!isActive) return // fast path
112 return suspendCancellableCoroutine { cont ->
113 cont.unregisterOnCompletion(onCompletion(ResumeOnCompletion(this, cont)))
114 }
115}
116
117// --------------- utility classes to simplify job implementation
118
119/**
120 * A concrete implementation of [Job]. It is optionally a child to a parent job.
121 * This job is cancelled when the parent is complete, but not vise-versa.
122 *
123 * This is an open class designed for extension by more specific classes that might augment the
124 * state and mare store addition state information for completed jobs, like their result values.
125 */
126@Suppress("LeakingThis")
Roman Elizarov58a7add2017-01-20 12:19:52 +0300127public open class JobSupport : AbstractCoroutineContextElement(Job), Job {
Roman Elizarov3754f952017-01-18 20:47:54 +0300128 // keeps a stack of cancel listeners or a special CANCELLED, other values denote completed scope
129 @Volatile
Roman Elizarova5676592017-01-19 10:31:13 +0300130 private var state: Any? = ActiveList() // will drop the list on cancel
Roman Elizarov3754f952017-01-18 20:47:54 +0300131
Roman Elizarov58a7add2017-01-20 12:19:52 +0300132 @Volatile
133 private var registration: Job.Registration? = null
Roman Elizarov3754f952017-01-18 20:47:54 +0300134
135 protected companion object {
136 @JvmStatic
137 private val STATE: AtomicReferenceFieldUpdater<JobSupport, Any?> =
138 AtomicReferenceFieldUpdater.newUpdater(JobSupport::class.java, Any::class.java, "state")
139 }
140
Roman Elizarov58a7add2017-01-20 12:19:52 +0300141 // invoke at most once after construction after all other initialization
Roman Elizarov41c5c8b2017-01-25 13:37:15 +0300142 public fun initParentJob(parent: Job?) {
Roman Elizarov58a7add2017-01-20 12:19:52 +0300143 if (parent == null) return
144 check(registration == null)
145 // directly pass HandlerNode to parent scope to optimize one closure object (see makeNode)
146 val newRegistration = parent.onCompletion(CancelOnCompletion(parent, this))
147 registration = newRegistration
148 // now check our state _after_ registering (see updateState order of actions)
149 if (state !is Active) newRegistration.unregister()
150 }
151
Roman Elizarov3754f952017-01-18 20:47:54 +0300152 protected fun getState(): Any? = state
153
154 protected fun updateState(expect: Any, update: Any?): Boolean {
Roman Elizarova5676592017-01-19 10:31:13 +0300155 expect as ActiveList // assert type
Roman Elizarov3754f952017-01-18 20:47:54 +0300156 require(update !is Active) // only active -> inactive transition is allowed
157 if (!STATE.compareAndSet(this, expect, update)) return false
Roman Elizarov41c5c8b2017-01-25 13:37:15 +0300158 // #1. Update linked state before invoking completion handlers
159 onStateUpdate(update)
160 // #2. Unregister from parent job
Roman Elizarov58a7add2017-01-20 12:19:52 +0300161 registration?.unregister() // volatile read registration _after_ state was updated
Roman Elizarov41c5c8b2017-01-25 13:37:15 +0300162 // #3. Invoke completion handlers
Roman Elizarov53a0a402017-01-19 20:21:57 +0300163 val reason = (update as? CompletedExceptionally)?.cancelReason
164 var completionException: Throwable? = null
Roman Elizarov3754f952017-01-18 20:47:54 +0300165 expect.forEach<JobNode> { node ->
166 try {
167 node.invoke(reason)
168 } catch (ex: Throwable) {
Roman Elizarov53a0a402017-01-19 20:21:57 +0300169 completionException?.apply { addSuppressed(ex) } ?: run { completionException = ex }
Roman Elizarov3754f952017-01-18 20:47:54 +0300170 }
171 }
Roman Elizarov41c5c8b2017-01-25 13:37:15 +0300172 // #4. Do other (overridable) processing after completion handlers
Roman Elizarov53a0a402017-01-19 20:21:57 +0300173 completionException?.let { handleCompletionException(it) }
174 afterCompletion(update)
Roman Elizarov3754f952017-01-18 20:47:54 +0300175 return true
176 }
177
178 public override val isActive: Boolean get() = state is Active
179
180 public override fun onCompletion(handler: CompletionHandler): Job.Registration {
181 var nodeCache: JobNode? = null
182 while (true) { // lock-free loop on state
183 val state = this.state
184 if (state !is Active) {
185 handler((state as? Cancelled)?.cancelReason)
186 return EmptyRegistration
187 }
188 val node = nodeCache ?: makeNode(handler).apply { nodeCache = this }
Roman Elizarova5676592017-01-19 10:31:13 +0300189 state as ActiveList // assert type
Roman Elizarov3754f952017-01-18 20:47:54 +0300190 if (state.addLastIf(node) { this.state == state }) return node
191 }
192 }
193
194 public override fun cancel(reason: Throwable?): Boolean {
195 while (true) { // lock-free loop on state
196 val state = this.state as? Active ?: return false // quit if not active anymore
197 if (updateState(state, Cancelled(reason))) return true
198 }
199 }
200
Roman Elizarov53a0a402017-01-19 20:21:57 +0300201 /**
Roman Elizarov41c5c8b2017-01-25 13:37:15 +0300202 * Override to make linked state changes before completion handlers are invoked.
203 */
204 protected open fun onStateUpdate(update: Any?) {}
205
206 /**
Roman Elizarov53a0a402017-01-19 20:21:57 +0300207 * Override to process any exceptions that were encountered while invoking [onCompletion] handlers.
208 */
209 protected open fun handleCompletionException(closeException: Throwable) {
210 throw closeException
Roman Elizarov3754f952017-01-18 20:47:54 +0300211 }
212
Roman Elizarov53a0a402017-01-19 20:21:57 +0300213 /**
214 * Override for post-completion actions that need to do something with the state.
215 */
216 protected open fun afterCompletion(state: Any?) {}
217
Roman Elizarov3754f952017-01-18 20:47:54 +0300218 private fun makeNode(handler: CompletionHandler): JobNode =
219 (handler as? JobNode)?.also { require(it.job === this) }
220 ?: InvokeOnCompletion(this, handler)
221
Roman Elizarova5676592017-01-19 10:31:13 +0300222 protected interface Active
223
224 private class ActiveList : LockFreeLinkedListHead(), Active
Roman Elizarov3754f952017-01-18 20:47:54 +0300225
226 protected abstract class CompletedExceptionally {
Roman Elizarov58a7add2017-01-20 12:19:52 +0300227 abstract val cancelReason: Throwable // original reason or fresh CancellationException
228 abstract val exception: Throwable // the exception to be thrown in continuation
Roman Elizarov3754f952017-01-18 20:47:54 +0300229 }
230
Roman Elizarov58a7add2017-01-20 12:19:52 +0300231 protected class Cancelled(specifiedReason: Throwable?) : CompletedExceptionally() {
232 @Volatile
233 private var _cancelReason = specifiedReason // materialize CancellationException on first need
234
235 override val cancelReason: Throwable get() =
236 _cancelReason ?: // atomic read volatile var or else create new
237 CancellationException().also { _cancelReason = it }
238
Roman Elizarov3754f952017-01-18 20:47:54 +0300239 @Volatile
240 private var _exception: Throwable? = null // convert reason to CancellationException on first need
Roman Elizarov58a7add2017-01-20 12:19:52 +0300241
Roman Elizarov3754f952017-01-18 20:47:54 +0300242 override val exception: Throwable get() =
Roman Elizarov58a7add2017-01-20 12:19:52 +0300243 _exception ?: // atomic read volatile var or else build new
244 (cancelReason as? CancellationException ?:
245 CancellationException(cancelReason.message).apply { initCause(cancelReason) })
246 .also { _exception = it }
Roman Elizarov3754f952017-01-18 20:47:54 +0300247 }
248
249 protected class Failed(override val exception: Throwable) : CompletedExceptionally() {
250 override val cancelReason: Throwable
251 get() = exception
252 }
253}
254
255internal abstract class JobNode(
256 val job: Job
257) : LockFreeLinkedListNode(), Job.Registration, CompletionHandler {
258 override fun unregister() {
259 // this is an object-allocation optimization -- do not remove if job is not active anymore
260 if (job.isActive) remove()
261 }
262
263 override abstract fun invoke(reason: Throwable?)
264}
265
266private class InvokeOnCompletion(
267 job: Job,
268 val handler: CompletionHandler
269) : JobNode(job) {
270 override fun invoke(reason: Throwable?) = handler.invoke(reason)
271 override fun toString() = "InvokeOnCompletion[${handler::class.java.name}@${Integer.toHexString(System.identityHashCode(handler))}]"
272}
273
274private class ResumeOnCompletion(
275 job: Job,
276 val continuation: Continuation<Unit>
277) : JobNode(job) {
278 override fun invoke(reason: Throwable?) = continuation.resume(Unit)
279 override fun toString() = "ResumeOnCompletion[$continuation]"
280}
281
282private class UnregisterOnCompletion(
283 job: Job,
284 val registration: Job.Registration
285) : JobNode(job) {
286 override fun invoke(reason: Throwable?) = registration.unregister()
287 override fun toString(): String = "UnregisterOnCompletion[$registration]"
288}
289
290private class CancelOnCompletion(
291 parentJob: Job,
292 val subordinateJob: Job
293) : JobNode(parentJob) {
294 override fun invoke(reason: Throwable?) { subordinateJob.cancel(reason) }
295 override fun toString(): String = "CancelOnCompletion[$subordinateJob]"
296}
297
298private object EmptyRegistration : Job.Registration {
299 override fun unregister() {}
300 override fun toString(): String = "EmptyRegistration"
301}
302
303private class CancelFutureOnCompletion(
304 job: Job,
305 val future: Future<*>
306) : JobNode(job) {
Roman Elizarova8331812017-01-24 10:04:31 +0300307 override fun invoke(reason: Throwable?) {
308 // Don't interrupt when cancelling future on completion, because no one is going to reset this
309 // interruption flag and it will cause spurious failures elsewhere
310 future.cancel(false)
311 }
Roman Elizarov3754f952017-01-18 20:47:54 +0300312 override fun toString() = "CancelFutureOnCompletion[$future]"
313}
Roman Elizarov53a0a402017-01-19 20:21:57 +0300314
315private class RemoveOnCompletion(
316 job: Job,
317 val node: LockFreeLinkedListNode
318) : JobNode(job) {
319 override fun invoke(reason: Throwable?) { node.remove() }
320 override fun toString() = "RemoveOnCompletion[$node]"
321}
Roman Elizarov58a7add2017-01-20 12:19:52 +0300322
323private class JobImpl(parent: Job? = null) : JobSupport() {
324 init { initParentJob(parent) }
325}