`defer` coroutine builder is renamed to `async`.
`lazyDefer` is deprecated, `async` has an optional `start` parameter instead.
`LazyDeferred` interface is deprecated, lazy start functionality is integrated into `Job` interface.
`launch` has an optional `start` parameter for lazily started coroutines.
`Job.start` and `Job.isCompleted` are introduced.
`Job.join` is now a member function.
Internal `JobSupport` state machine is enhanced to support _new_ (not-started-yet) state.
So, lazy coroutines do not need a separate state variable to track their started/not-started (new/active) status.
Example on async-style functions is added to coroutines guide.
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
index 15e7a72..1422db5 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
@@ -23,24 +23,32 @@
/**
* Launches new coroutine without blocking current thread and returns a reference to the coroutine as a [Job].
- * The running coroutine is cancelled when the resulting job is [cancelled][Job.cancel].
+ * The coroutine is cancelled when the resulting job is [cancelled][Job.cancel].
*
* The [context] for the new coroutine must be explicitly specified.
* See [CoroutineDispatcher] for the standard [context] implementations that are provided by `kotlinx.coroutines`.
* The [context][CoroutineScope.context] of the parent coroutine from its [scope][CoroutineScope] may be used,
* in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
*
+ * An optional [start] parameter can be set to `false` to start coroutine _lazily_. When `start = false`,
+ * the coroutine [Job] is created in _new_ state. It can be explicitly started with [start][Job.start] function
+ * and will be started implicitly on the first invocation of [join][Job.join].
+ *
* Uncaught exceptions in this coroutine cancel parent job in the context by default
* (unless [CoroutineExceptionHandler] is explicitly specified), which means that when `launch` is used with
* the context of another coroutine, then any uncaught exception leads to the cancellation of parent coroutine.
*
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
*/
-fun launch(context: CoroutineContext, block: suspend CoroutineScope.() -> Unit): Job =
- StandaloneCoroutine(newCoroutineContext(context)).apply {
- initParentJob(context[Job])
- block.startCoroutine(this, this)
- }
+fun launch(context: CoroutineContext, start: Boolean = true, block: suspend CoroutineScope.() -> Unit): Job {
+ val newContext = newCoroutineContext(context)
+ val coroutine = if (start)
+ StandaloneCoroutine(newContext, active = true) else
+ LazyStandaloneCoroutine(newContext, block)
+ coroutine.initParentJob(context[Job])
+ if (start) block.startCoroutine(coroutine, coroutine)
+ return coroutine
+}
/**
* Calls the specified suspending block with a given coroutine context, suspends until it completes, and returns
@@ -57,7 +65,7 @@
}
/**
- * Runs new coroutine and *blocks* current thread *interruptibly* until its completion.
+ * Runs new coroutine and **blocks** current thread _interruptibly_ until its completion.
* This function should not be used from coroutine. It is designed to bridge regular blocking code
* to libraries that are written in suspending style, to be used in `main` functions and in tests.
*
@@ -84,15 +92,25 @@
// --------------- implementation ---------------
-private class StandaloneCoroutine(
- val parentContext: CoroutineContext
-) : AbstractCoroutine<Unit>(parentContext) {
+private open class StandaloneCoroutine(
+ val parentContext: CoroutineContext,
+ active: Boolean
+) : AbstractCoroutine<Unit>(parentContext, active) {
override fun afterCompletion(state: Any?) {
// note the use of the parent's job context below!
if (state is CompletedExceptionally) handleCoroutineException(parentContext, state.exception)
}
}
+private class LazyStandaloneCoroutine(
+ parentContext: CoroutineContext,
+ val block: suspend CoroutineScope.() -> Unit
+) : StandaloneCoroutine(parentContext, active = false) {
+ override fun onStart() {
+ block.startCoroutine(this, this)
+ }
+}
+
private class InnerCoroutine<in T>(
override val context: CoroutineContext,
continuation: Continuation<T>
@@ -104,7 +122,7 @@
context: CoroutineContext,
val blockedThread: Thread,
val hasPrivateEventLoop: Boolean
-) : AbstractCoroutine<T>(context) {
+) : AbstractCoroutine<T>(context, active = true) {
val eventLoop: EventLoop? = context[ContinuationInterceptor] as? EventLoop
override fun afterCompletion(state: Any?) {
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
index 53a0940..4336c97 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
@@ -31,9 +31,11 @@
* with the specified cancel cause.
*
* Cancellable continuation has three states:
- * * _Active_ (initial state) -- [isActive] `true`, [isCancelled] `false`.
- * * _Resumed_ (final _completed_ state) -- [isActive] `false`, [isCancelled] `false`.
- * * _Canceled_ (final _completed_ state) -- [isActive] `false`, [isCancelled] `true`.
+ *
+ * | **State** | [isActive] | [isCompleted] | [isCancelled] |
+ * | _Active_ (initial state) | `true` | `false` | `false` |
+ * | _Resumed_ (final _completed_ state) | `false` | `true` | `false` |
+ * | _Canceled_ (final _completed_ state)| `false` | `true` | `true` |
*
* Invocation of [cancel] transitions this continuation from _active_ to _cancelled_ state, while
* invocation of [resume] or [resumeWithException] transitions it from _active_ to _resumed_ state.
@@ -43,7 +45,9 @@
*/
public interface CancellableContinuation<in T> : Continuation<T>, Job {
/**
- * Returns `true` if this continuation was [cancelled][cancel]. It implies that [isActive] is `false`.
+ * Returns `true` if this continuation was [cancelled][cancel].
+ *
+ * It implies that [isActive] is `false` and [isCompleted] is `true`.
*/
val isCancelled: Boolean
@@ -105,7 +109,7 @@
internal class SafeCancellableContinuation<in T>(
private val delegate: Continuation<T>,
private val parentJob: Job?
-) : AbstractCoroutine<T>(delegate.context), CancellableContinuation<T> {
+) : AbstractCoroutine<T>(delegate.context, active = true), CancellableContinuation<T> {
// only updated from the thread that invoked suspendCancellableCoroutine
@Volatile
@@ -144,7 +148,7 @@
while (true) { // lock-free loop on state
val state = getState() // atomic read
when (state) {
- is Active -> if (tryUpdateState(state, value)) return state
+ is Incomplete -> if (tryUpdateState(state, value)) return state
else -> return null // cannot resume -- not active anymore
}
}
@@ -154,7 +158,7 @@
while (true) { // lock-free loop on state
val state = getState() // atomic read
when (state) {
- is Active -> if (tryUpdateState(state, CompletedExceptionally(exception))) return state
+ is Incomplete -> if (tryUpdateState(state, CompletedExceptionally(exception))) return state
else -> return null // cannot resume -- not active anymore
}
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt
index e9ced21..abc54d2 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt
@@ -45,6 +45,9 @@
override fun dispatch(context: CoroutineContext, block: Runnable) { throw UnsupportedOperationException() }
}
+/**
+ * **Deprecated**: `Here` was renamed to `Unconfined`.
+ */
@Deprecated(message = "`Here` was renamed to `Unconfined`",
replaceWith = ReplaceWith(expression = "Unconfined"))
public typealias Here = Unconfined
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
index 3f510bc..1c4bb86 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
@@ -52,17 +52,22 @@
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
- DispatchedContinuation<T>(this, continuation)
+ DispatchedContinuation(this, continuation)
+ /**
+ * **Error**: Operator '+' on two CoroutineDispatcher objects is meaningless.
+ * CoroutineDispatcher is a coroutine context element and `+` is a set-sum operator for coroutine contexts.
+ * The dispatcher to the right of `+` just replaces the dispatcher the left of `+`.
+ */
@Suppress("DeprecatedCallableAddReplaceWith")
@Deprecated(message = "Operator '+' on two CoroutineDispatcher objects is meaningless. " +
"CoroutineDispatcher is a coroutine context element and `+` is a set-sum operator for coroutine contexts. " +
- "The dispatcher to the right of `+` just replaces the dispacher the left of `+`.",
+ "The dispatcher to the right of `+` just replaces the dispatcher the left of `+`.",
level = DeprecationLevel.ERROR)
public operator fun plus(other: CoroutineDispatcher) = other
}
-internal class DispatchedContinuation<T>(
+internal class DispatchedContinuation<in T>(
val dispatcher: CoroutineDispatcher,
val continuation: Continuation<T>
): Continuation<T> by continuation {
@@ -100,7 +105,7 @@
if (dispatcher.isDispatchNeeded(context))
dispatcher.dispatch(context, Runnable {
withCoroutineContext(context) {
- if (job?.isActive == false)
+ if (job?.isCompleted == true)
continuation.resumeWithException(job.getCompletionException())
else
continuation.resume(value)
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt
index 8ac8d54..b5b99e2 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt
@@ -21,11 +21,20 @@
/**
* Receiver interface for generic coroutine builders, so that the code inside coroutine has a convenient access
- * to its [context] and cancellation status via [isActive].
+ * to its [context] and its cancellation status via [isActive].
*/
public interface CoroutineScope {
/**
- * Returns `true` when this coroutine is still active (was not cancelled).
+ * Returns `true` when this coroutine is still active (has not completed yet).
+ *
+ * Check this property in long-running computation loops to support cancellation:
+ * ```
+ * while (isActive) {
+ * // do some computation
+ * }
+ * ```
+ *
+ * This property is a shortcut for `context[Job]!!.isActive`. See [context] and [Job].
*/
public val isActive: Boolean
@@ -41,14 +50,17 @@
* It stores the result of continuation in the state of the job.
*/
@Suppress("LeakingThis")
-internal abstract class AbstractCoroutine<in T>(context: CoroutineContext) : JobSupport(), Continuation<T>, CoroutineScope {
+internal abstract class AbstractCoroutine<in T>(
+ context: CoroutineContext,
+ active: Boolean
+) : JobSupport(active), Continuation<T>, CoroutineScope {
override val context: CoroutineContext = context + this // merges this job into this context
final override fun resume(value: T) {
while (true) { // lock-free loop on state
val state = getState() // atomic read
when (state) {
- is Active -> if (updateState(state, value)) return
+ is Incomplete -> if (updateState(state, value)) return
is Cancelled -> return // ignore resumes on cancelled continuation
else -> throw IllegalStateException("Already resumed, but got value $value")
}
@@ -59,7 +71,7 @@
while (true) { // lock-free loop on state
val state = getState() // atomic read
when (state) {
- is Active -> if (updateState(state, CompletedExceptionally(exception))) return
+ is Incomplete -> if (updateState(state, CompletedExceptionally(exception))) return
is Cancelled -> {
// ignore resumes on cancelled continuation, but handle exception if a different one is here
if (exception != state.exception) handleCoroutineException(context, exception)
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
index ab61173..515da4d 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
@@ -20,103 +20,129 @@
import kotlin.coroutines.experimental.startCoroutine
/**
- * Deferred value is conceptually a non-blocking cancellable future.
- * It is created with [defer] coroutine builder.
+ * Deferred value is a non-blocking cancellable future.
+ * It is created with [async] coroutine builder.
* It is in [active][isActive] state while the value is being computed.
*
- * Deferred value has four states:
+ * Deferred value has four or five possible states.
*
- * * _Active_ (initial state) -- [isActive] `true`, [isCompletedExceptionally] `false`,
- * and [isCancelled] `false`.
- * Both [getCompleted] and [getCompletionException] throw [IllegalStateException].
- * * _Computed_ (final _completed_ state) -- [isActive] `false`,
- * [isCompletedExceptionally] `false`, [isCancelled] `false`.
- * * _Failed_ (final _completed_ state) -- [isActive] `false`,
- * [isCompletedExceptionally] `true`, [isCancelled] `false`.
- * * _Canceled_ (final _completed_ state) -- [isActive] `false`,
- * [isCompletedExceptionally] `true`, [isCancelled] `true`.
+ * | **State** | [isActive] | [isCompleted] | [isCompletedExceptionally] | [isCancelled] |
+ * | _New_ (optional initial state) | `false` | `false` | `false` | `false` |
+ * | _Active_ (default initial state) | `true` | `false` | `false` | `false` |
+ * | _Resolved_ (final state) | `false` | `true` | `false` | `false` |
+ * | _Failed_ (final state) | `false` | `true` | `true` | `false` |
+ * | _Cancelled_ (final state) | `false` | `true` | `true` | `true` |
+ *
+ * Usually, a deferred value is created in _active_ state (it is created and started), so its only visible
+ * states are _active_ and _completed_ (_resolved_, _failed_, or _cancelled_) state.
+ * However, [async] coroutine builder has an optional `start` parameter that creates a deferred value in _new_ state
+ * when this parameter is set to `false`.
+ * Such a deferred can be be made _active_ by invoking [start], [join], or [await].
*/
public interface Deferred<out T> : Job {
/**
* Returns `true` if computation of this deferred value has _completed exceptionally_ -- it had
* either _failed_ with exception during computation or was [cancelled][cancel].
- * It implies that [isActive] is `false`.
+ *
+ * It implies that [isActive] is `false` and [isCompleted] is `true`.
*/
val isCompletedExceptionally: Boolean
/**
* Returns `true` if computation of this deferred value was [cancelled][cancel].
- * It implies that [isActive] is `false` and [isCompletedExceptionally] is `true`.
+ *
+ * It implies that [isActive] is `false`, [isCompleted] is `true`, and [isCompletedExceptionally] is `true`.
*/
val isCancelled: Boolean
/**
* Awaits for completion of this value without blocking a thread and resumes when deferred computation is complete.
* This suspending function is cancellable.
+ *
* If the [Job] of the current coroutine is completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
*/
public suspend fun await(): T
/**
- * Returns *completed* result or throws [IllegalStateException] if this deferred value is still [isActive].
- * It throws the corresponding exception if this deferred has completed exceptionally.
+ * Returns *completed* result or throws [IllegalStateException] if this deferred value has not
+ * [completed][isCompleted] yet. It throws the corresponding exception if this deferred has
+ * [completed exceptionally][isCompletedExceptionally].
+ *
* This function is designed to be used from [onCompletion] handlers, when there is an absolute certainty that
* the value is already complete.
*/
public fun getCompleted(): T
+
+ /**
+ * **Deprecated**: Use `isActive`.
+ */
+ @Deprecated(message = "Use `isActive`", replaceWith = ReplaceWith("isActive"))
+ public val isComputing: Boolean get() = isActive
}
/**
- * Starts new coroutine and returns its result as an implementation of [Deferred].
- * The running coroutine is cancelled when the resulting object is [cancelled][Job.cancel].
+ * Creates new coroutine and returns its future result as an implementation of [Deferred].
*
+ * The running coroutine is cancelled when the resulting object is [cancelled][Job.cancel].
* The [context] for the new coroutine must be explicitly specified.
* See [CoroutineDispatcher] for the standard [context] implementations that are provided by `kotlinx.coroutines`.
* The [context][CoroutineScope.context] of the parent coroutine from its [scope][CoroutineScope] may be used,
* in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
+ *
+ * An optional [start] parameter can be set to `false` to start coroutine _lazily_. When `start = false`,
+ * the resulting [Deferred] is created in _new_ state. It can be explicitly started with [start][Job.start]
+ * function and will be started implicitly on the first invocation of [join][Job.join] or [await][Deferred.await].
+ *
+ * By default, the coroutine is immediately started. Set an optional [start] parameters to `false`
+ * to create coroutine without starting it. In this case it will be _lazy_ and will start
*/
+public fun <T> async(context: CoroutineContext, start: Boolean = true, block: suspend CoroutineScope.() -> T) : Deferred<T> {
+ val newContext = newCoroutineContext(context)
+ val coroutine = if (start)
+ DeferredCoroutine<T>(newContext, active = true) else
+ LazyDeferredCoroutine(newContext, block)
+ coroutine.initParentJob(context[Job])
+ if (start) block.startCoroutine(coroutine, coroutine)
+ return coroutine
+}
+
+/**
+ * **Deprecated**: `defer` was renamed to `async`.
+ */
+@Deprecated(message = "`defer` was renamed to `async`", level = DeprecationLevel.WARNING,
+ replaceWith = ReplaceWith("async(context, block = block)"))
public fun <T> defer(context: CoroutineContext, block: suspend CoroutineScope.() -> T) : Deferred<T> =
- DeferredCoroutine<T>(newCoroutineContext(context)).apply {
- initParentJob(context[Job])
- block.startCoroutine(this, this)
- }
+ async(context, block = block)
-internal open class DeferredCoroutine<T>(
- context: CoroutineContext
-) : AbstractCoroutine<T>(context), Deferred<T> {
- protected open fun start(): Boolean = false // LazyDeferredCoroutine overrides
-
+private open class DeferredCoroutine<T>(
+ context: CoroutineContext,
+ active: Boolean
+) : AbstractCoroutine<T>(context, active), Deferred<T> {
override val isCompletedExceptionally: Boolean get() = getState() is CompletedExceptionally
override val isCancelled: Boolean get() = getState() is Cancelled
@Suppress("UNCHECKED_CAST")
suspend override fun await(): T {
- // quick check if already complete (avoid extra object creation)
- getState().let { state ->
- if (state !is Active) {
+ // fast-path -- check state (avoid extra object creation)
+ while(true) { // lock-free loop on state
+ val state = this.getState()
+ if (state !is Incomplete) {
+ // already complete -- just return result
if (state is CompletedExceptionally) throw state.exception
return state as T
+
}
+ if (startInternal(state) >= 0) break // break unless needs to retry
}
- if (start()) { // LazyDeferredCoroutine overrides
- // recheck state (may have started & already completed
- getState().let { state ->
- if (state !is Active) {
- if (state is CompletedExceptionally) throw state.exception
- return state as T
- }
- }
- }
- // Note: await is cancellable itself!
- return awaitGetValue()
+ return awaitSuspend() // slow-path
}
@Suppress("UNCHECKED_CAST")
- private suspend fun awaitGetValue(): T = suspendCancellableCoroutine { cont ->
+ private suspend fun awaitSuspend(): T = suspendCancellableCoroutine { cont ->
cont.unregisterOnCompletion(onCompletion {
val state = getState()
- check(state !is Active)
+ check(state !is Incomplete)
if (state is CompletedExceptionally)
cont.resumeWithException(state.exception)
else
@@ -127,12 +153,24 @@
@Suppress("UNCHECKED_CAST")
override fun getCompleted(): T {
val state = getState()
- check(state !is Active) { "This deferred value is still active" }
+ check(state !is Incomplete) { "This deferred value has not completed yet" }
if (state is CompletedExceptionally) throw state.exception
return state as T
}
// for nicer debugging
- override fun toString(): String = "${javaClass.simpleName}{" +
- (if (isActive) "isActive=true" else "completed=${getState()}") + "}"
+ override fun toString(): String {
+ val state = getState()
+ val result = if (state is Incomplete) "" else "[$state]"
+ return "${javaClass.simpleName}{${describeState(state)}}$result@${Integer.toHexString(System.identityHashCode(this))}"
+ }
+}
+
+private class LazyDeferredCoroutine<T>(
+ context: CoroutineContext,
+ val block: suspend CoroutineScope.() -> T
+) : DeferredCoroutine<T>(context, active = false) {
+ override fun onStart() {
+ block.startCoroutine(this, this)
+ }
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
index 38bc013..041aa9c 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
@@ -74,7 +74,7 @@
queue.addLast(node)
true
} else
- queue.addLastIf(node) { parentJob!!.isActive }
+ queue.addLastIf(node) { !parentJob!!.isCompleted }
if (added) {
if (Thread.currentThread() !== thread)
LockSupport.unpark(thread)
@@ -96,5 +96,7 @@
abstract class Node : LockFreeLinkedListNode(), Runnable
class Dispatch(block: Runnable) : Node(), Runnable by block
+
+ override fun toString(): String = "EventLoopImpl@${Integer.toHexString(System.identityHashCode(this))}"
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
index 39e0d60..8c06a34 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
@@ -19,6 +19,7 @@
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
import java.util.concurrent.Future
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
import kotlin.coroutines.experimental.AbstractCoroutineContextElement
import kotlin.coroutines.experimental.Continuation
@@ -27,13 +28,21 @@
// --------------- core job interfaces ---------------
/**
- * A background job.
+ * A background job. It is created with [launch] coroutine builder or with a
+ * [`Job()`][Job.Key.invoke] factory function.
* A job can be _cancelled_ at any time with [cancel] function that forces it to become _completed_ immediately.
*
- * It has two states:
- * * _Active_ (initial state) -- [isActive] `true`,
- * [getCompletionException] throws [IllegalStateException].
- * * _Completed_ (final state) -- [isActive] `false`.
+ * A job has two or three states:
+ *
+ * | **State** | [isActive] | [isCompleted] |
+ * | _New_ (optional initial state) | `false` | `false` |
+ * | _Active_ (default initial state) | `true` | `false` |
+ * | _Completed_ (final state) | `false` | `true` |
+ *
+ * Usually, a job is created in _active_ state (it is created and started), so its only visible
+ * states are _active_ and _completed_. However, coroutine builders that provide an optional `start` parameter
+ * create a coroutine in _new_ state when this parameter is set to `false`. Such a job can
+ * be made _active_ by invoking [start] or [join].
*
* A job in the coroutine [context][CoroutineScope.context] represents the coroutine itself.
* A job is active while the coroutine is working and job's cancellation aborts the coroutine when
@@ -51,21 +60,34 @@
*/
public companion object Key : CoroutineContext.Key<Job> {
/**
- * Creates new job object. It is optionally a child of a [parent] job.
+ * Creates a new job object in _active_ state.
+ * It is optionally a child of a [parent] job.
*/
public operator fun invoke(parent: Job? = null): Job = JobImpl(parent)
}
/**
- * Returns `true` when job is still active.
+ * Returns `true` when this job is active.
*/
public val isActive: Boolean
/**
+ * Returns `true` when this job has completed for any reason.
+ */
+ public val isCompleted: Boolean
+
+ /**
+ * Starts coroutine related to this job (if any) if it was not started yet.
+ * The result `true` if this invocation actually started coroutine or `false`
+ * if it was already started or completed.
+ */
+ public fun start(): Boolean
+
+ /**
* Returns the exception that signals the completion of this job -- it returns the original
* [cancel] cause or an instance of [CancellationException] if this job had completed
* normally or was cancelled without a cause. This function throws
- * [IllegalStateException] when invoked for an [active][isActive] job.
+ * [IllegalStateException] when invoked for an job that has not [completed][isCompleted] yet.
*
* The [cancellable][suspendCancellableCoroutine] suspending functions throw this exception
* when trying to suspend in the context of this job.
@@ -85,6 +107,16 @@
public fun onCompletion(handler: CompletionHandler): Registration
/**
+ * Suspends coroutine until this job is complete. This invocation resumes normally (without exception)
+ * when the job is complete for any reason. This function also [starts][Job.start] the corresponding coroutine
+ * if the [Job] was still in _new_ state.
+ *
+ * This suspending function is cancellable. If the [Job] of the invoking coroutine is completed while this
+ * suspending function is suspended, this function immediately resumes with [CancellationException].
+ */
+ public suspend fun join()
+
+ /**
* Cancel this activity with an optional cancellation [cause]. The result is `true` if this job was
* cancelled as a result of this invocation and `false` otherwise
* (if it was already _completed_ or if it is [NonCancellable]).
@@ -96,6 +128,11 @@
*/
public fun cancel(cause: Throwable? = null): Boolean
+ /**
+ * **Error**: Operator '+' on two Job objects is meaningless.
+ * Job is a coroutine context element and `+` is a set-sum operator for coroutine contexts.
+ * The job to the right of `+` just replaces the job the left of `+`.
+ */
@Suppress("DeprecatedCallableAddReplaceWith")
@Deprecated(message = "Operator '+' on two Job objects is meaningless. " +
"Job is a coroutine context element and `+` is a set-sum operator for coroutine contexts. " +
@@ -115,6 +152,9 @@
}
}
+/**
+ * Handler for [Job.onCompletion].
+ */
public typealias CompletionHandler = (Throwable?) -> Unit
/**
@@ -124,6 +164,7 @@
/**
* Unregisters a specified [registration] when this job is complete.
+ *
* This is a shortcut for the following code with slightly more efficient implementation (one fewer object created).
* ```
* onCompletion { registration.unregister() }
@@ -134,6 +175,7 @@
/**
* Cancels a specified [future] when this job is complete.
+ *
* This is a shortcut for the following code with slightly more efficient implementation (one fewer object created).
* ```
* onCompletion { future.cancel(true) }
@@ -146,17 +188,21 @@
onCompletion(RemoveOnCompletion(this, node))
/**
- * Suspends coroutine until this job is complete. This invocation resumes normally (without exception)
- * when the job is complete for any reason.
- *
- * This suspending function is cancellable. If the [Job] of the invoking coroutine is completed while this
- * suspending function is suspended, this function immediately resumes with [CancellationException].
+ * **Deprecated**: `join` is now a member function of `Job`.
*/
-public suspend fun Job.join() {
- if (!isActive) return // fast path
- return suspendCancellableCoroutine { cont ->
- cont.unregisterOnCompletion(onCompletion(ResumeOnCompletion(this, cont)))
- }
+@Suppress("EXTENSION_SHADOWED_BY_MEMBER", "DeprecatedCallableAddReplaceWith")
+@Deprecated(message = "`join` is now a member function of `Job`")
+public suspend fun Job.join() = this.join()
+
+/**
+ * No-op implementation of [Job.Registration].
+ */
+public object EmptyRegistration : Job.Registration {
+ /** Does not do anything. */
+ override fun unregister() {}
+
+ /** Returns "EmptyRegistration" string. */
+ override fun toString(): String = "EmptyRegistration"
}
// --------------- utility classes to simplify job implementation
@@ -167,45 +213,53 @@
*
* This is an open class designed for extension by more specific classes that might augment the
* state and mare store addition state information for completed jobs, like their result values.
+ *
+ * Initial state of this job is either _active_ when `active = true` or _new_ when `active = false`.
*/
-internal open class JobSupport : AbstractCoroutineContextElement(Job), Job {
+internal open class JobSupport(active: Boolean) : AbstractCoroutineContextElement(Job), Job {
/*
- === States ===
- name state class is Active?
- ------ ------------ ---------
- EMPTY Empty : Active -- no completion listener
- SINGLE JobNode : Active -- a single completion listener
- SINGLE+ JobNode : Active -- a single completion listener + NodeList added as its next
- LIST NodeList : Active -- a list of listeners (promoted just once, does not got back to JobNode/Empty)
- FINAL_C Cancelled : !Active -- cancelled (final state)
- FINAL_F Failed : !Active -- failed for other reason (final state)
- FINAL_R <any> : !Active -- produced some result
+ === Internal states ===
+
+ name state class public state description
+ ------ ------------ ------------ -----------
+ EMPTY_N EmptyNew : New no completion listeners
+ EMPTY_A EmptyActive : Active no completion listeners
+ SINGLE JobNode : Active a single completion listener
+ SINGLE+ JobNode : Active a single completion listener + NodeList added as its next
+ LIST_N NodeList : New a list of listeners (promoted once, does not got back to EmptyNew)
+ LIST_A NodeList : Active a list of listeners (promoted once, does not got back to JobNode/EmptyActive)
+ FINAL_C Cancelled : Completed cancelled (final state)
+ FINAL_F Failed : Completed failed for other reason (final state)
+ FINAL_R <any> : Completed produced some result
=== Transitions ===
-
- Active states !Active states
- +---------+ +----------+
- initial -+-> | EMPTY | ------------> | FINAL_* |
- | +---------+ +----------+
- | | ^ ^
- | V | |
- | +---------+ |
- | | SINGLE | --------------------+
- | +---------+ |
- | | |
- | V |
- | +---------+ |
- +-- | SINGLE+ | --------------------+
- +---------+ |
- | |
- V |
- +---------+ |
- | LIST | --------------------+
- +---------+
+
+ New states Active states Inactive states
+ +---------+ +---------+ +----------+
+ | EMPTY_N | --+-> | EMPTY_A | --+-> | FINAL_* |
+ +---------+ | +---------+ | +----------+
+ | | | ^ |
+ | | V | |
+ | | +---------+ |
+ | | | SINGLE | --+
+ | | +---------+ |
+ | | | |
+ | | V |
+ | | +---------+ |
+ | +-- | SINGLE+ | --+
+ | +---------+ |
+ | | |
+ V V |
+ +---------+ +---------+ |
+ | LIST_N | ----> | LIST_A | --+
+ +---------+ +---------+
+
+ This state machine and its transition matrix are optimized for the common case when job is created in active
+ state (EMPTY_A) and at most one completion listener is added to it during its life-time.
*/
@Volatile
- private var state: Any? = Empty // shared object while we have no listeners
+ private var state: Any? = if (active) EmptyActive else EmptyNew // shared objects while we have no listeners
@Volatile
private var registration: Job.Registration? = null
@@ -214,6 +268,11 @@
@JvmStatic
private val STATE: AtomicReferenceFieldUpdater<JobSupport, Any?> =
AtomicReferenceFieldUpdater.newUpdater(JobSupport::class.java, Any::class.java, "state")
+
+ fun describeState(state: Any?): String =
+ if (state is Incomplete)
+ if (state.isActive) "Active" else "New"
+ else "Completed"
}
/**
@@ -230,7 +289,7 @@
val newRegistration = parent.onCompletion(CancelOnCompletion(parent, this))
registration = newRegistration
// now check our state _after_ registering (see updateState order of actions)
- if (state !is Active) newRegistration.unregister()
+ if (isCompleted) newRegistration.unregister()
}
/**
@@ -248,17 +307,15 @@
}
fun tryUpdateState(expect: Any, update: Any?): Boolean {
- require(expect is Active && update !is Active) // only active -> inactive transition is allowed
+ require(expect is Incomplete && update !is Incomplete) // only incomplete -> completed transition is allowed
if (!STATE.compareAndSet(this, expect, update)) return false
- // #1. Update linked state before invoking completion handlers
- onStateUpdate(update)
- // #2. Unregister from parent job
+ // Unregister from parent job
registration?.unregister() // volatile read registration _after_ state was updated
return true // continues in completeUpdateState
}
fun completeUpdateState(expect: Any, update: Any?) {
- // #3. Invoke completion handlers
+ // Invoke completion handlers
val cause = (update as? CompletedExceptionally)?.exception
var completionException: Throwable? = null
when (expect) {
@@ -277,20 +334,58 @@
}
}
- // otherwise -- do nothing (Empty)
- else -> check(expect === Empty)
+ // otherwise -- do nothing (it was Empty*)
+ else -> check(expect === EmptyActive || expect == EmptyNew)
}
- // #4. Do other (overridable) processing after completion handlers
+ // Do other (overridable) processing after completion handlers
completionException?.let { handleCompletionException(it) }
afterCompletion(update)
}
- final override val isActive: Boolean get() = state is Active
+ final override val isActive: Boolean get() {
+ val state = this.state
+ return state is Incomplete && state.isActive
+ }
+
+ final override val isCompleted: Boolean get() = state !is Incomplete
+
+ final override fun start(): Boolean {
+ while (true) { // lock-free loop on state
+ when (startInternal(state)) {
+ 0 -> return false
+ 1 -> return true
+ }
+ }
+ }
+
+ // return: 0 -> false (not new), 1 -> true (started), -1 -> retry
+ protected fun startInternal(state: Any?): Int {
+ when {
+ // EMPTY_NEW state -- no completion handlers, new
+ state === EmptyNew -> {
+ if (!STATE.compareAndSet(this, state, EmptyActive)) return -1
+ onStart()
+ return 1
+ }
+ // LIST -- a list of completion handlers (either new or active)
+ state is NodeList -> {
+ if (state.isActive) return 0
+ if (!NodeList.ACTIVE.compareAndSet(state, 0, 1)) return -1
+ onStart()
+ return 1
+ }
+ // not a new state
+ else -> return 0
+ }
+ }
+
+ // override to provide the actual start action
+ protected open fun onStart() {}
override fun getCompletionException(): Throwable {
val state = getState()
return when (state) {
- is Active -> throw IllegalStateException("Job is still active")
+ is Incomplete -> throw IllegalStateException("Job has not completed yet")
is CompletedExceptionally -> state.exception
else -> CancellationException("Job has completed normally")
}
@@ -301,27 +396,32 @@
while (true) { // lock-free loop on state
val state = this.state
when {
- // EMPTY state -- no completion handlers
- state === Empty -> {
+ // EMPTY_ACTIVE state -- no completion handlers, active
+ state === EmptyActive -> {
// try move to SINGLE state
val node = nodeCache ?: makeNode(handler).also { nodeCache = it }
if (STATE.compareAndSet(this, state, node)) return node
}
+ // EMPTY_NEW state -- no completion handlers, new
+ state === EmptyNew -> {
+ // try to promote it to list in new state
+ STATE.compareAndSet(this, state, NodeList(active = 0))
+ }
// SINGLE/SINGLE+ state -- one completion handler
state is JobNode -> {
- // try promote it to the list (SINGLE+ state)
- state.addFirstIfEmpty(NodeList())
+ // try to promote it to list (SINGLE+ state)
+ state.addFirstIfEmpty(NodeList(active = 1))
// it must be in SINGLE+ state or state has changed (node could have need removed from state)
val list = state.next() // either NodeList or somebody else won the race, updated state
// just attempt converting it to list if state is still the same, then continue lock-free loop
STATE.compareAndSet(this, state, list)
}
- // LIST -- a list of completion handlers
+ // LIST -- a list of completion handlers (either new or active)
state is NodeList -> {
val node = nodeCache ?: makeNode(handler).also { nodeCache = it }
if (state.addLastIf(node) { this.state === state }) return node
}
- // is not active anymore
+ // is inactive
else -> {
handler((state as? CompletedExceptionally)?.exception)
return EmptyRegistration
@@ -330,26 +430,36 @@
}
}
+ final override suspend fun join() {
+ while (true) { // lock-free loop on state
+ val state = this.state as? Incomplete ?: return // fast-path - no need to wait
+ if (startInternal(state) >= 0) break // break unless needs to retry
+ }
+ return joinSuspend() // slow-path
+ }
+
+ private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
+ cont.unregisterOnCompletion(onCompletion(ResumeOnCompletion(this, cont)))
+ }
+
fun removeNode(node: JobNode) {
// remove logic depends on the state of the job
while (true) { // lock-free loop on job state
val state = this.state
- when {
- // EMPTY state -- no completion handlers
- state === Empty -> return
+ when (state) {
// SINGE/SINGLE+ state -- one completion handler
- state is JobNode -> {
+ is JobNode -> {
if (state !== this) return // a different job node --> we were already removed
// try remove and revert back to empty state
- if (STATE.compareAndSet(this, state, Empty)) return
+ if (STATE.compareAndSet(this, state, EmptyActive)) return
}
// LIST -- a list of completion handlers
- state is NodeList -> {
+ is NodeList -> {
// remove node from the list
node.remove()
return
}
- // is not active anymore
+ // it is inactive or Empty* (does not have any completion handlers)
else -> return
}
}
@@ -357,17 +467,12 @@
final override fun cancel(cause: Throwable?): Boolean {
while (true) { // lock-free loop on state
- val state = this.state as? Active ?: return false // quit if not active anymore
+ val state = this.state as? Incomplete ?: return false // quit if already complete
if (updateState(state, Cancelled(cause))) return true
}
}
/**
- * Override to make linked state changes before completion handlers are invoked.
- */
- open fun onStateUpdate(update: Any?) {}
-
- /**
* Override to process any exceptions that were encountered while invoking [onCompletion] handlers.
*/
open fun handleCompletionException(closeException: Throwable) {
@@ -384,19 +489,40 @@
?: InvokeOnCompletion(this, handler)
// for nicer debugging
- override fun toString(): String = "${javaClass.simpleName}{isActive=$isActive}"
+ override fun toString(): String = "${javaClass.simpleName}{${describeState(state)}}@${Integer.toHexString(System.identityHashCode(this))}"
/**
- * Marker interface for active [state][getState] of a job.
+ * Interface for incomplete [state][getState] of a job.
*/
- internal interface Active
-
- private object Empty : Active {
- override fun toString(): String = "Empty"
+ internal interface Incomplete {
+ val isActive: Boolean
}
- private class NodeList : LockFreeLinkedListHead(), Active {
+ private object EmptyNew : Incomplete {
+ override val isActive: Boolean get() = false
+ override fun toString(): String = "Empty{New}"
+ }
+
+ private object EmptyActive : Incomplete {
+ override val isActive: Boolean get() = true
+ override fun toString(): String = "Empty{Active}"
+ }
+
+ private class NodeList(
+ @Volatile
+ var active: Int
+ ) : LockFreeLinkedListHead(), Incomplete {
+ override val isActive: Boolean get() = active != 0
+
+ companion object {
+ @JvmStatic
+ val ACTIVE: AtomicIntegerFieldUpdater<NodeList> =
+ AtomicIntegerFieldUpdater.newUpdater(NodeList::class.java, "active")
+ }
+
override fun toString(): String = buildString {
+ append("List")
+ append(if (isActive) "{Active}" else "{New}")
append("[")
var first = true
this@NodeList.forEach<JobNode> { node ->
@@ -429,7 +555,8 @@
internal abstract class JobNode(
val job: Job
-) : LockFreeLinkedListNode(), Job.Registration, CompletionHandler, JobSupport.Active {
+) : LockFreeLinkedListNode(), Job.Registration, CompletionHandler, JobSupport.Incomplete {
+ final override val isActive: Boolean get() = true
// if unregister is called on this instance, then Job was an instance of JobSupport that added this node it itself
// directly without wrapping
final override fun unregister() = (job as JobSupport).removeNode(this)
@@ -468,11 +595,6 @@
override fun toString(): String = "CancelOnCompletion[$subordinateJob]"
}
-internal object EmptyRegistration : Job.Registration {
- override fun unregister() {}
- override fun toString(): String = "EmptyRegistration"
-}
-
private class CancelFutureOnCompletion(
job: Job,
val future: Future<*>
@@ -493,6 +615,6 @@
override fun toString() = "RemoveOnCompletion[$node]"
}
-private class JobImpl(parent: Job? = null) : JobSupport() {
+private class JobImpl(parent: Job? = null) : JobSupport(true) {
init { initParentJob(parent) }
}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/LazyDeferred.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/LazyDeferred.kt
index 68cc0a5..83c2869 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/LazyDeferred.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/LazyDeferred.kt
@@ -16,123 +16,19 @@
package kotlinx.coroutines.experimental
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
import kotlin.coroutines.experimental.CoroutineContext
-import kotlin.coroutines.experimental.startCoroutine
/**
- * Lazy deferred value is conceptually a non-blocking cancellable future that is started on
- * the first [await] or [start] invocation.
- * It is created with [lazyDefer] coroutine builder.
- *
- * Unlike a simple [Deferred] value, a lazy deferred value has five states:
- *
- * * _Pending_ (initial, _active_ state before the starts of the coroutine) --
- * [isActive] `true`, but [isComputing] `false`,
- * [isCompletedExceptionally] `false`, and [isCancelled] `false`.
- * * _Computing_ (intermediate state while computing the value) --
- * [isActive] `true`, [isComputing] `true`,
- * [isCompletedExceptionally] `false`, and [isCancelled] `false`.
- * * _Computed_ (final _completed_ state) -- [isActive] `false`, [isComputing] `false`,
- * [isCompletedExceptionally] `false`, [isCancelled] `false`.
- * * _Failed_ (final _completed_ state) -- [isActive] `false`, [isComputing] `false`,
- * [isCompletedExceptionally] `true`, [isCancelled] `false`.
- * * _Canceled_ (final _completed_ state) -- [isActive] `false`, [isComputing] `false`,
- * [isCompletedExceptionally] `true`, [isCancelled] `true`.
- *
- * If this lazy deferred value is [cancelled][cancel], then it becomes immediately complete and
- * cancels ongoing computation coroutine if it was started.
+ * **Deprecated**: `Deferred` incorporates functionality of `LazyDeferred`. See [Deferred].
*/
-public interface LazyDeferred<out T> : Deferred<T> {
- /**
- * Returns `true` if the coroutine is computing its value.
- */
- public val isComputing: Boolean
-
- /**
- * Starts coroutine to compute this lazily deferred value. The result `true` if this invocation actually
- * started coroutine or `false` if it was already started or cancelled.
- */
- public fun start(): Boolean
-}
+@Deprecated(message = "`Deferred` incorporates functionality of `LazyDeferred`", level = DeprecationLevel.WARNING,
+ replaceWith = ReplaceWith("Deferred"))
+typealias LazyDeferred<T> = Deferred<T>
/**
- * Lazily starts new coroutine on the first [await][Deferred.await] or [start][LazyDeferred.start] invocation
- * on the resulting [LazyDeferred].
- * The running coroutine is cancelled when the resulting value is [cancelled][Job.cancel].
- *
- * The [context] for the new coroutine must be explicitly specified.
- * See [CoroutineDispatcher] for the standard [context] implementations that are provided by `kotlinx.coroutines`.
- * The [context][CoroutineScope.context] of the parent coroutine from its [scope][CoroutineScope] may be used,
- * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
+ * **Deprecated**: Replace with `async(context, start = false) { ... }`. See [async].
*/
-public fun <T> lazyDefer(context: CoroutineContext, block: suspend CoroutineScope.() -> T) : LazyDeferred<T> =
- LazyDeferredCoroutine(newCoroutineContext(context), block).apply {
- initParentJob(context[Job])
- }
-
-private class LazyDeferredCoroutine<T>(
- context: CoroutineContext,
- val block: suspend CoroutineScope.() -> T
-) : DeferredCoroutine<T>(context), LazyDeferred<T> {
-
- @Volatile
- var lazyState: Int = STATE_PENDING
-
- companion object {
- private val STATE_PENDING = 0
- private val STATE_COMPUTING = 1
- private val STATE_COMPLETE = 2
-
- private val LAZY_STATE: AtomicIntegerFieldUpdater<LazyDeferredCoroutine<*>> =
- AtomicIntegerFieldUpdater.newUpdater(LazyDeferredCoroutine::class.java, "lazyState")
- }
-
- /*
- === State linking & linearization of the overall state ===
-
- There are two state variables in this object and they have to update atomically from the standpoint of
- external observer:
- 1. Job.state that is used by isActive function.
- 2. lazyState that is used to make sure coroutine starts at most once.
- External observer must see only three states, not four, i.e. it should not be able
- to see `isActive == false`, but `isComputing == true`.
-
- On completion/cancellation state variables are updated in this order:
- a) state <- complete (isComplete starts returning true)
- b) lazyState <- STATE_COMPLETE (see onStateUpdate)
- This is why, `isComputing` checks state variables in reverse order:
- a) lazyState is checked _first_
- b) isActive is checked after it
- This way cancellation/completion is atomic w.r.t to all state functions.
-
- `start` function also has to check lazyState _before_ isActive.
- */
-
- override val isComputing: Boolean get() = lazyState == STATE_COMPUTING && isActive
-
- override fun start(): Boolean {
- while (true) { // lock-free loop on lazyState
- when (lazyState) { // volatile read
- STATE_PENDING -> {
- if (isActive) { // then volatile read Job.state (inside isActive)
- // can try to start
- if (LAZY_STATE.compareAndSet(this, STATE_PENDING, STATE_COMPUTING)) {
- block.startCoroutine(this, this)
- return true
- }
- } else {
- // cannot start -- already complete -- help update lazyState
- lazyState = STATE_COMPLETE
- return false
- }
- }
- else -> return false
- }
- }
- }
-
- override fun onStateUpdate(update: Any?) {
- lazyState = STATE_COMPLETE
- }
-}
+@Deprecated(message = "This functionality is incorporated into `async", level = DeprecationLevel.WARNING,
+ replaceWith = ReplaceWith("async(context, start = false, block = block)"))
+public fun <T> lazyDefer(context: CoroutineContext, block: suspend CoroutineScope.() -> T) : Deferred<T> =
+ async(context, start = false, block = block)
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt
index e85b9c7..83953db 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt
@@ -31,8 +31,24 @@
* ```
*/
object NonCancellable : AbstractCoroutineContextElement(Job), Job {
+ /** Always returns `true`. */
override val isActive: Boolean get() = true
+
+ /** Always returns `false`. */
+ override val isCompleted: Boolean get() = false
+
+ /** Always returns `false`. */
+ override fun start(): Boolean = false
+
+ /** Always throws [UnsupportedOperationException]. */
+ suspend override fun join() { throw UnsupportedOperationException("This job is always active") }
+
+ /** Always throws [IllegalStateException]. */
override fun getCompletionException(): CancellationException = throw IllegalStateException("This job is always active")
+
+ /** Always returns [EmptyRegistration]. */
override fun onCompletion(handler: CompletionHandler): Job.Registration = EmptyRegistration
+
+ /** Always returns `false`. */
override fun cancel(cause: Throwable?): Boolean = false
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelBuilder.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelBuilder.kt
index 10a45a9..6f5e652 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelBuilder.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelBuilder.kt
@@ -64,7 +64,7 @@
private class ChannelCoroutine<E>(
context: CoroutineContext,
val channel: Channel<E>
-) : AbstractCoroutine<Unit>(context), ChannelBuilder<E>, ChannelJob<E>, Channel<E> by channel {
+) : AbstractCoroutine<Unit>(context, active = true), ChannelBuilder<E>, ChannelJob<E>, Channel<E> by channel {
override fun afterCompletion(state: Any?) {
val cause = (state as? CompletedExceptionally)?.exception
if (!channel.close(cause) && cause != null)