Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 1 | package kotlinx.coroutines.experimental |
| 2 | |
Roman Elizarov | a567659 | 2017-01-19 10:31:13 +0300 | [diff] [blame] | 3 | import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead |
| 4 | import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 5 | import java.util.concurrent.Future |
| 6 | import java.util.concurrent.atomic.AtomicReferenceFieldUpdater |
| 7 | import kotlin.coroutines.AbstractCoroutineContextElement |
| 8 | import kotlin.coroutines.Continuation |
| 9 | import kotlin.coroutines.CoroutineContext |
| 10 | |
| 11 | // --------------- core job interfaces --------------- |
| 12 | |
| 13 | /** |
| 14 | * A background job. It has two states: _active_ (initial state) and _completed_ (final state). |
Roman Elizarov | 44ba4b1 | 2017-01-25 11:37:54 +0300 | [diff] [blame] | 15 | * |
| 16 | * A job can be _cancelled_ at any time with [cancel] function that forces it to become completed immediately. |
| 17 | * A job in the coroutine [context][CoroutineScope.context] represents the coroutine itself. |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 18 | * A job is active while the coroutine is working and job's cancellation aborts the coroutine when |
| 19 | * the coroutine is suspended on a _cancellable_ suspension point by throwing [CancellationException] |
| 20 | * inside the coroutine. |
| 21 | * |
Roman Elizarov | 44ba4b1 | 2017-01-25 11:37:54 +0300 | [diff] [blame] | 22 | * A job can have a _parent_. A job with a parent is cancelled when its parent completes. |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 23 | * |
| 24 | * All functions on this interface are thread-safe. |
| 25 | */ |
| 26 | public interface Job : CoroutineContext.Element { |
Roman Elizarov | d528e3e | 2017-01-23 15:40:05 +0300 | [diff] [blame] | 27 | /** |
| 28 | * Key for [Job] instance in the coroutine context. |
| 29 | */ |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 30 | public companion object Key : CoroutineContext.Key<Job> { |
| 31 | /** |
| 32 | * Creates new job object. It is optionally a child of a [parent] job. |
| 33 | */ |
Roman Elizarov | 58a7add | 2017-01-20 12:19:52 +0300 | [diff] [blame] | 34 | public operator fun invoke(parent: Job? = null): Job = JobImpl(parent) |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 35 | } |
| 36 | |
| 37 | /** |
| 38 | * Returns `true` when job is still active. |
| 39 | */ |
| 40 | public val isActive: Boolean |
| 41 | |
| 42 | /** |
| 43 | * Registers completion handler. The action depends on the state of this job. |
| 44 | * When job is cancelled with [cancel], then the handler is immediately invoked |
| 45 | * with a cancellation reason. Otherwise, handler will be invoked once when this |
| 46 | * job is complete (cancellation also is a form of completion). |
| 47 | * The resulting [Registration] can be used to [Registration.unregister] if this |
| 48 | * registration is no longer needed. There is no need to unregister after completion. |
| 49 | */ |
| 50 | public fun onCompletion(handler: CompletionHandler): Registration |
| 51 | |
| 52 | /** |
| 53 | * Cancel this activity with an optional cancellation [reason]. The result is `true` if this job was |
| 54 | * cancelled as a result of this invocation and `false` otherwise (if it was already cancelled). |
| 55 | * When cancellation has a clear reason in the code, an instance of [CancellationException] should be created |
| 56 | * at the corresponding original cancellation site and passed into this method to aid in debugging by providing |
| 57 | * both the context of cancellation and text description of the reason. |
| 58 | */ |
| 59 | public fun cancel(reason: Throwable? = null): Boolean |
| 60 | |
| 61 | /** |
| 62 | * Registration object for [onCompletion]. It can be used to [unregister] if needed. |
| 63 | * There is no need to unregister after completion. |
| 64 | */ |
| 65 | public interface Registration { |
| 66 | /** |
| 67 | * Unregisters completion handler. |
| 68 | */ |
| 69 | public fun unregister() |
| 70 | } |
| 71 | } |
| 72 | |
Roman Elizarov | 58a7add | 2017-01-20 12:19:52 +0300 | [diff] [blame] | 73 | public typealias CompletionHandler = (Throwable?) -> Unit |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 74 | |
Roman Elizarov | 58a7add | 2017-01-20 12:19:52 +0300 | [diff] [blame] | 75 | /** |
| 76 | * Thrown by cancellable suspending functions if the [Job] of the coroutine is cancelled while it is suspending. |
| 77 | */ |
| 78 | public typealias CancellationException = java.util.concurrent.CancellationException |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 79 | |
| 80 | /** |
| 81 | * Unregisters a specified [registration] when this job is complete. |
| 82 | * This is a shortcut for the following code with slightly more efficient implementation (one fewer object created). |
| 83 | * ``` |
| 84 | * onCompletion { registration.unregister() } |
| 85 | * ``` |
| 86 | */ |
| 87 | public fun Job.unregisterOnCompletion(registration: Job.Registration): Job.Registration = |
| 88 | onCompletion(UnregisterOnCompletion(this, registration)) |
| 89 | |
| 90 | /** |
| 91 | * Cancels a specified [future] when this job is complete. |
| 92 | * This is a shortcut for the following code with slightly more efficient implementation (one fewer object created). |
| 93 | * ``` |
| 94 | * onCompletion { future.cancel(true) } |
| 95 | * ``` |
| 96 | */ |
| 97 | public fun Job.cancelFutureOnCompletion(future: Future<*>): Job.Registration = |
| 98 | onCompletion(CancelFutureOnCompletion(this, future)) |
| 99 | |
Roman Elizarov | 53a0a40 | 2017-01-19 20:21:57 +0300 | [diff] [blame] | 100 | internal fun Job.removeOnCompletion(node: LockFreeLinkedListNode): Job.Registration = |
| 101 | onCompletion(RemoveOnCompletion(this, node)) |
| 102 | |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 103 | /** |
| 104 | * Suspends coroutine until this job is complete. This invocation resumes normally (without exception) |
| 105 | * when the job is complete for any reason. |
| 106 | * |
| 107 | * This suspending function is cancellable. If the [Job] of the invoking coroutine is completed while this |
| 108 | * suspending function is suspended, this function immediately resumes with [CancellationException]. |
| 109 | */ |
| 110 | public suspend fun Job.join() { |
| 111 | if (!isActive) return // fast path |
| 112 | return suspendCancellableCoroutine { cont -> |
| 113 | cont.unregisterOnCompletion(onCompletion(ResumeOnCompletion(this, cont))) |
| 114 | } |
| 115 | } |
| 116 | |
| 117 | // --------------- utility classes to simplify job implementation |
| 118 | |
| 119 | /** |
| 120 | * A concrete implementation of [Job]. It is optionally a child to a parent job. |
| 121 | * This job is cancelled when the parent is complete, but not vise-versa. |
| 122 | * |
| 123 | * This is an open class designed for extension by more specific classes that might augment the |
| 124 | * state and mare store addition state information for completed jobs, like their result values. |
| 125 | */ |
| 126 | @Suppress("LeakingThis") |
Roman Elizarov | 58a7add | 2017-01-20 12:19:52 +0300 | [diff] [blame] | 127 | public open class JobSupport : AbstractCoroutineContextElement(Job), Job { |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 128 | // keeps a stack of cancel listeners or a special CANCELLED, other values denote completed scope |
| 129 | @Volatile |
Roman Elizarov | a567659 | 2017-01-19 10:31:13 +0300 | [diff] [blame] | 130 | private var state: Any? = ActiveList() // will drop the list on cancel |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 131 | |
Roman Elizarov | 58a7add | 2017-01-20 12:19:52 +0300 | [diff] [blame] | 132 | @Volatile |
| 133 | private var registration: Job.Registration? = null |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 134 | |
| 135 | protected companion object { |
| 136 | @JvmStatic |
| 137 | private val STATE: AtomicReferenceFieldUpdater<JobSupport, Any?> = |
| 138 | AtomicReferenceFieldUpdater.newUpdater(JobSupport::class.java, Any::class.java, "state") |
| 139 | } |
| 140 | |
Roman Elizarov | 58a7add | 2017-01-20 12:19:52 +0300 | [diff] [blame] | 141 | // invoke at most once after construction after all other initialization |
Roman Elizarov | 41c5c8b | 2017-01-25 13:37:15 +0300 | [diff] [blame^] | 142 | public fun initParentJob(parent: Job?) { |
Roman Elizarov | 58a7add | 2017-01-20 12:19:52 +0300 | [diff] [blame] | 143 | if (parent == null) return |
| 144 | check(registration == null) |
| 145 | // directly pass HandlerNode to parent scope to optimize one closure object (see makeNode) |
| 146 | val newRegistration = parent.onCompletion(CancelOnCompletion(parent, this)) |
| 147 | registration = newRegistration |
| 148 | // now check our state _after_ registering (see updateState order of actions) |
| 149 | if (state !is Active) newRegistration.unregister() |
| 150 | } |
| 151 | |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 152 | protected fun getState(): Any? = state |
| 153 | |
| 154 | protected fun updateState(expect: Any, update: Any?): Boolean { |
Roman Elizarov | a567659 | 2017-01-19 10:31:13 +0300 | [diff] [blame] | 155 | expect as ActiveList // assert type |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 156 | require(update !is Active) // only active -> inactive transition is allowed |
| 157 | if (!STATE.compareAndSet(this, expect, update)) return false |
Roman Elizarov | 41c5c8b | 2017-01-25 13:37:15 +0300 | [diff] [blame^] | 158 | // #1. Update linked state before invoking completion handlers |
| 159 | onStateUpdate(update) |
| 160 | // #2. Unregister from parent job |
Roman Elizarov | 58a7add | 2017-01-20 12:19:52 +0300 | [diff] [blame] | 161 | registration?.unregister() // volatile read registration _after_ state was updated |
Roman Elizarov | 41c5c8b | 2017-01-25 13:37:15 +0300 | [diff] [blame^] | 162 | // #3. Invoke completion handlers |
Roman Elizarov | 53a0a40 | 2017-01-19 20:21:57 +0300 | [diff] [blame] | 163 | val reason = (update as? CompletedExceptionally)?.cancelReason |
| 164 | var completionException: Throwable? = null |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 165 | expect.forEach<JobNode> { node -> |
| 166 | try { |
| 167 | node.invoke(reason) |
| 168 | } catch (ex: Throwable) { |
Roman Elizarov | 53a0a40 | 2017-01-19 20:21:57 +0300 | [diff] [blame] | 169 | completionException?.apply { addSuppressed(ex) } ?: run { completionException = ex } |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 170 | } |
| 171 | } |
Roman Elizarov | 41c5c8b | 2017-01-25 13:37:15 +0300 | [diff] [blame^] | 172 | // #4. Do other (overridable) processing after completion handlers |
Roman Elizarov | 53a0a40 | 2017-01-19 20:21:57 +0300 | [diff] [blame] | 173 | completionException?.let { handleCompletionException(it) } |
| 174 | afterCompletion(update) |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 175 | return true |
| 176 | } |
| 177 | |
| 178 | public override val isActive: Boolean get() = state is Active |
| 179 | |
| 180 | public override fun onCompletion(handler: CompletionHandler): Job.Registration { |
| 181 | var nodeCache: JobNode? = null |
| 182 | while (true) { // lock-free loop on state |
| 183 | val state = this.state |
| 184 | if (state !is Active) { |
| 185 | handler((state as? Cancelled)?.cancelReason) |
| 186 | return EmptyRegistration |
| 187 | } |
| 188 | val node = nodeCache ?: makeNode(handler).apply { nodeCache = this } |
Roman Elizarov | a567659 | 2017-01-19 10:31:13 +0300 | [diff] [blame] | 189 | state as ActiveList // assert type |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 190 | if (state.addLastIf(node) { this.state == state }) return node |
| 191 | } |
| 192 | } |
| 193 | |
| 194 | public override fun cancel(reason: Throwable?): Boolean { |
| 195 | while (true) { // lock-free loop on state |
| 196 | val state = this.state as? Active ?: return false // quit if not active anymore |
| 197 | if (updateState(state, Cancelled(reason))) return true |
| 198 | } |
| 199 | } |
| 200 | |
Roman Elizarov | 53a0a40 | 2017-01-19 20:21:57 +0300 | [diff] [blame] | 201 | /** |
Roman Elizarov | 41c5c8b | 2017-01-25 13:37:15 +0300 | [diff] [blame^] | 202 | * Override to make linked state changes before completion handlers are invoked. |
| 203 | */ |
| 204 | protected open fun onStateUpdate(update: Any?) {} |
| 205 | |
| 206 | /** |
Roman Elizarov | 53a0a40 | 2017-01-19 20:21:57 +0300 | [diff] [blame] | 207 | * Override to process any exceptions that were encountered while invoking [onCompletion] handlers. |
| 208 | */ |
| 209 | protected open fun handleCompletionException(closeException: Throwable) { |
| 210 | throw closeException |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 211 | } |
| 212 | |
Roman Elizarov | 53a0a40 | 2017-01-19 20:21:57 +0300 | [diff] [blame] | 213 | /** |
| 214 | * Override for post-completion actions that need to do something with the state. |
| 215 | */ |
| 216 | protected open fun afterCompletion(state: Any?) {} |
| 217 | |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 218 | private fun makeNode(handler: CompletionHandler): JobNode = |
| 219 | (handler as? JobNode)?.also { require(it.job === this) } |
| 220 | ?: InvokeOnCompletion(this, handler) |
| 221 | |
Roman Elizarov | a567659 | 2017-01-19 10:31:13 +0300 | [diff] [blame] | 222 | protected interface Active |
| 223 | |
| 224 | private class ActiveList : LockFreeLinkedListHead(), Active |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 225 | |
| 226 | protected abstract class CompletedExceptionally { |
Roman Elizarov | 58a7add | 2017-01-20 12:19:52 +0300 | [diff] [blame] | 227 | abstract val cancelReason: Throwable // original reason or fresh CancellationException |
| 228 | abstract val exception: Throwable // the exception to be thrown in continuation |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 229 | } |
| 230 | |
Roman Elizarov | 58a7add | 2017-01-20 12:19:52 +0300 | [diff] [blame] | 231 | protected class Cancelled(specifiedReason: Throwable?) : CompletedExceptionally() { |
| 232 | @Volatile |
| 233 | private var _cancelReason = specifiedReason // materialize CancellationException on first need |
| 234 | |
| 235 | override val cancelReason: Throwable get() = |
| 236 | _cancelReason ?: // atomic read volatile var or else create new |
| 237 | CancellationException().also { _cancelReason = it } |
| 238 | |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 239 | @Volatile |
| 240 | private var _exception: Throwable? = null // convert reason to CancellationException on first need |
Roman Elizarov | 58a7add | 2017-01-20 12:19:52 +0300 | [diff] [blame] | 241 | |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 242 | override val exception: Throwable get() = |
Roman Elizarov | 58a7add | 2017-01-20 12:19:52 +0300 | [diff] [blame] | 243 | _exception ?: // atomic read volatile var or else build new |
| 244 | (cancelReason as? CancellationException ?: |
| 245 | CancellationException(cancelReason.message).apply { initCause(cancelReason) }) |
| 246 | .also { _exception = it } |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 247 | } |
| 248 | |
| 249 | protected class Failed(override val exception: Throwable) : CompletedExceptionally() { |
| 250 | override val cancelReason: Throwable |
| 251 | get() = exception |
| 252 | } |
| 253 | } |
| 254 | |
| 255 | internal abstract class JobNode( |
| 256 | val job: Job |
| 257 | ) : LockFreeLinkedListNode(), Job.Registration, CompletionHandler { |
| 258 | override fun unregister() { |
| 259 | // this is an object-allocation optimization -- do not remove if job is not active anymore |
| 260 | if (job.isActive) remove() |
| 261 | } |
| 262 | |
| 263 | override abstract fun invoke(reason: Throwable?) |
| 264 | } |
| 265 | |
| 266 | private class InvokeOnCompletion( |
| 267 | job: Job, |
| 268 | val handler: CompletionHandler |
| 269 | ) : JobNode(job) { |
| 270 | override fun invoke(reason: Throwable?) = handler.invoke(reason) |
| 271 | override fun toString() = "InvokeOnCompletion[${handler::class.java.name}@${Integer.toHexString(System.identityHashCode(handler))}]" |
| 272 | } |
| 273 | |
| 274 | private class ResumeOnCompletion( |
| 275 | job: Job, |
| 276 | val continuation: Continuation<Unit> |
| 277 | ) : JobNode(job) { |
| 278 | override fun invoke(reason: Throwable?) = continuation.resume(Unit) |
| 279 | override fun toString() = "ResumeOnCompletion[$continuation]" |
| 280 | } |
| 281 | |
| 282 | private class UnregisterOnCompletion( |
| 283 | job: Job, |
| 284 | val registration: Job.Registration |
| 285 | ) : JobNode(job) { |
| 286 | override fun invoke(reason: Throwable?) = registration.unregister() |
| 287 | override fun toString(): String = "UnregisterOnCompletion[$registration]" |
| 288 | } |
| 289 | |
| 290 | private class CancelOnCompletion( |
| 291 | parentJob: Job, |
| 292 | val subordinateJob: Job |
| 293 | ) : JobNode(parentJob) { |
| 294 | override fun invoke(reason: Throwable?) { subordinateJob.cancel(reason) } |
| 295 | override fun toString(): String = "CancelOnCompletion[$subordinateJob]" |
| 296 | } |
| 297 | |
| 298 | private object EmptyRegistration : Job.Registration { |
| 299 | override fun unregister() {} |
| 300 | override fun toString(): String = "EmptyRegistration" |
| 301 | } |
| 302 | |
| 303 | private class CancelFutureOnCompletion( |
| 304 | job: Job, |
| 305 | val future: Future<*> |
| 306 | ) : JobNode(job) { |
Roman Elizarov | a833181 | 2017-01-24 10:04:31 +0300 | [diff] [blame] | 307 | override fun invoke(reason: Throwable?) { |
| 308 | // Don't interrupt when cancelling future on completion, because no one is going to reset this |
| 309 | // interruption flag and it will cause spurious failures elsewhere |
| 310 | future.cancel(false) |
| 311 | } |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 312 | override fun toString() = "CancelFutureOnCompletion[$future]" |
| 313 | } |
Roman Elizarov | 53a0a40 | 2017-01-19 20:21:57 +0300 | [diff] [blame] | 314 | |
| 315 | private class RemoveOnCompletion( |
| 316 | job: Job, |
| 317 | val node: LockFreeLinkedListNode |
| 318 | ) : JobNode(job) { |
| 319 | override fun invoke(reason: Throwable?) { node.remove() } |
| 320 | override fun toString() = "RemoveOnCompletion[$node]" |
| 321 | } |
Roman Elizarov | 58a7add | 2017-01-20 12:19:52 +0300 | [diff] [blame] | 322 | |
| 323 | private class JobImpl(parent: Job? = null) : JobSupport() { |
| 324 | init { initParentJob(parent) } |
| 325 | } |