Coroutines now wait for their children
JobSuport.attachChild is introduced;
Job "Completing" state is introduced;
withTimeout is a proper coroutine;
Better diagnostics in cancellation and unexpected exception messages;
Fixed cancellable suspending function to throw CancellationException;
Job.getCompletionException renamed to Job.getCancellationException;
Introduced Deferred.getCompletionExceptionOrNull
Updated docs for Job & Deferred to explain parent/child;
Deprecate and hide legacy Job.invokeOnCompletion signatures;
Updated guide for parent-child relations and related stuff
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/AbstractCoroutine.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/AbstractCoroutine.kt
index b85fd80..eca5f1a 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/AbstractCoroutine.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/AbstractCoroutine.kt
@@ -20,9 +20,12 @@
import kotlin.coroutines.experimental.CoroutineContext
/**
- * Abstract class to simplify writing of coroutine completion objects that
- * implement completion [Continuation], [Job], and [CoroutineScope] interfaces.
- * It stores the result of continuation in the state of the job.
+ * Abstract class for coroutines.
+ *
+ * * Coroutines implement completion [Continuation], [Job], and [CoroutineScope] interfaces.
+ * * Coroutine stores the result of continuation in the state of the job.
+ * * Coroutine waits for children coroutines to finish before completing.
+ * * Coroutines are cancelled through an intermediate _cancelling_ state.
*
* @param active when `true` coroutine is created in _active_ state, when `false` in _new_ state. See [Job] for details.
* @suppress **This is unstable API and it is subject to change.**
@@ -35,35 +38,26 @@
public final override val context: CoroutineContext = parentContext + this
public final override val coroutineContext: CoroutineContext get() = context
+ // all coroutines are cancelled through an intermediate cancelling state
final override val hasCancellingState: Boolean get() = true
+ protected open val defaultResumeMode: Int get() = MODE_ATOMIC_DEFAULT
+
final override fun resume(value: T) {
- loopOnState { state ->
- when (state) {
- is Incomplete -> if (updateState(state, value, MODE_ATOMIC_DEFAULT)) return
- is Cancelled -> return // ignore resumes on cancelled continuation
- else -> error("Already resumed, but got value $value")
- }
- }
+ makeCompleting(value, defaultResumeMode)
}
final override fun resumeWithException(exception: Throwable) {
- loopOnState { state ->
- when (state) {
- is Incomplete -> {
- if (updateState(state, CompletedExceptionally(exception), MODE_ATOMIC_DEFAULT)) return
- }
- is Cancelled -> {
- // ignore resumes on cancelled continuation, but handle exception if a different one is here
- if (exception !== state.exception) handleCoroutineException(context, exception)
- return
- }
- else -> throw IllegalStateException("Already resumed, but got exception $exception", exception)
- }
- }
+ makeCompleting(CompletedExceptionally(exception), defaultResumeMode)
}
final override fun handleException(exception: Throwable) {
handleCoroutineException(parentContext, exception)
}
-}
\ No newline at end of file
+
+ override fun nameString(): String {
+ val coroutineName = context.coroutineName ?: return super.nameString()
+ return "\"$coroutineName\":${super.nameString()}"
+ }
+}
+
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
index 6c19bcb..2c001e3 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
@@ -141,11 +141,10 @@
@Throws(InterruptedException::class)
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
val currentThread = Thread.currentThread()
- val eventLoop = if (context[ContinuationInterceptor] == null) EventLoopImpl(currentThread) else null
+ val eventLoop = if (context[ContinuationInterceptor] == null) BlockingEventLoop(currentThread) else null
val newContext = newCoroutineContext(context + (eventLoop ?: EmptyCoroutineContext))
val coroutine = BlockingCoroutine<T>(newContext, currentThread, privateEventLoop = eventLoop != null)
coroutine.initParentJob(context[Job])
- eventLoop?.initParentJob(coroutine)
block.startCoroutine(coroutine, coroutine)
return coroutine.joinBlocking()
}
@@ -156,9 +155,9 @@
private val parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
- override fun afterCompletion(state: Any?, mode: Int) {
+ override fun onCancellation(exceptionally: CompletedExceptionally?) {
// note the use of the parent's job context below!
- if (state is CompletedExceptionally) handleCoroutineException(parentContext, state.exception)
+ if (exceptionally != null) handleCoroutineException(parentContext, exceptionally.exception)
}
}
@@ -209,10 +208,14 @@
private val eventLoop: EventLoop? = parentContext[ContinuationInterceptor] as? EventLoop
init {
- if (privateEventLoop) require(eventLoop is EventLoopImpl)
+ if (privateEventLoop) require(eventLoop is BlockingEventLoop)
}
override fun afterCompletion(state: Any?, mode: Int) {
+ // signal termination to event loop (don't accept more tasks)
+ if (privateEventLoop)
+ (eventLoop as BlockingEventLoop).isCompleted = true
+ // wake up blocked thread
if (Thread.currentThread() != blockedThread)
LockSupport.unpark(blockedThread)
}
@@ -228,11 +231,12 @@
timeSource.parkNanos(this, parkNanos)
}
// process queued events (that could have been added after last processNextEvent and before cancel
- if (privateEventLoop) (eventLoop as EventLoopImpl).shutdown()
+ if (privateEventLoop) (eventLoop as BlockingEventLoop).shutdown()
timeSource.unregisterTimeLoopThread()
// now return result
val state = this.state
(state as? CompletedExceptionally)?.let { throw it.exception }
return state as T
}
+
}
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
index b90973a..5c3144a 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
@@ -162,7 +162,7 @@
* @suppress **This is unstable API and it is subject to change.**
*/
public fun CancellableContinuation<*>.removeOnCancel(node: LockFreeLinkedListNode): DisposableHandle =
- invokeOnCompletion(RemoveOnCancel(this, node))
+ invokeOnCompletion(handler = RemoveOnCancel(this, node))
// --------------- implementation details ---------------
@@ -245,7 +245,7 @@
}
override fun completeResume(token: Any) {
- completeUpdateState(token, state, resumeMode)
+ completeUpdateState(token as Incomplete, state, resumeMode)
}
override fun CoroutineDispatcher.resumeUndispatched(value: T) {
@@ -258,7 +258,8 @@
resumeWithExceptionImpl(exception, if (dc.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
}
- override fun toString(): String = super.toString() + "[${delegate.toDebugString()}]"
+ override fun nameString(): String =
+ "CancellableContinuation(${delegate.toDebugString()})"
}
private class CompletedIdempotentResult(
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt
index 602b9b9..b2c3e55 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt
@@ -89,33 +89,40 @@
* Executes a block using a given coroutine context.
*/
internal inline fun <T> withCoroutineContext(context: CoroutineContext, block: () -> T): T {
- val oldName = updateContext(context)
+ val oldName = context.updateThreadContext()
try {
return block()
} finally {
- restoreContext(oldName)
+ restoreThreadContext(oldName)
}
}
@PublishedApi
-internal fun updateContext(context: CoroutineContext): String? {
+internal fun CoroutineContext.updateThreadContext(): String? {
if (!DEBUG) return null
- val newId = context[CoroutineId] ?: return null
+ val coroutineId = this[CoroutineId] ?: return null
+ val coroutineName = this[CoroutineName]?.name ?: "coroutine"
val currentThread = Thread.currentThread()
val oldName = currentThread.name
- val coroutineName = context[CoroutineName]?.name ?: "coroutine"
currentThread.name = buildString(oldName.length + coroutineName.length + 10) {
append(oldName)
append(" @")
append(coroutineName)
append('#')
- append(newId.id)
+ append(coroutineId.id)
}
return oldName
}
+internal val CoroutineContext.coroutineName: String? get() {
+ if (!DEBUG) return null
+ val coroutineId = this[CoroutineId] ?: return null
+ val coroutineName = this[CoroutineName]?.name ?: "coroutine"
+ return "$coroutineName#${coroutineId.id}"
+}
+
@PublishedApi
-internal fun restoreContext(oldName: String?) {
+internal fun restoreThreadContext(oldName: String?) {
if (oldName != null) Thread.currentThread().name = oldName
}
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
index 6176392..3b03d18 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
@@ -115,7 +115,7 @@
val job = if (cancellable) context[Job] else null
withCoroutineContext(context) {
when {
- job != null && !job.isActive -> continuation.resumeWithException(job.getCompletionException())
+ job != null && !job.isActive -> continuation.resumeWithException(job.getCancellationException())
exception -> continuation.resumeWithException(value as Throwable)
else -> continuation.resume(value as T)
}
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineExceptionHandler.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineExceptionHandler.kt
index 8e83d62..609f092 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineExceptionHandler.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineExceptionHandler.kt
@@ -22,13 +22,14 @@
/**
* Helper function for coroutine builder implementations to handle uncaught exception in coroutines.
+ *
* It tries to handle uncaught exception in the following way:
* * If there is [CoroutineExceptionHandler] in the context, then it is used.
* * Otherwise, if exception is [CancellationException] then it is ignored
* (because that is the supposed mechanism to cancel the running coroutine)
- * * Otherwise, if there is a [Job] in the context, then [Job.cancel] is invoked and if it
- * returns `true` (it was still active), then the exception is considered to be handled.
- * * Otherwise, current thread's [Thread.uncaughtExceptionHandler] is used.
+ * * Otherwise:
+ * * if there is a [Job] in the context, then [Job.cancel] is invoked;
+ * * and current thread's [Thread.uncaughtExceptionHandler] is invoked.
*/
fun handleCoroutineException(context: CoroutineContext, exception: Throwable) {
context[CoroutineExceptionHandler]?.let {
@@ -37,15 +38,23 @@
}
// ignore CancellationException (they are normal means to terminate a coroutine)
if (exception is CancellationException) return
- // quit if successfully pushed exception as cancellation reason
- if (context[Job]?.cancel(exception) ?: false) return
- // otherwise just use thread's handler
+ // try cancel job in the context
+ context[Job]?.cancel(exception)
+ // use thread's handler
val currentThread = Thread.currentThread()
currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception)
}
/**
* An optional element on the coroutine context to handle uncaught exceptions.
+ *
+ * By default, when no handler is installed, uncaught exception are handled in the following way:
+ * * If exception is [CancellationException] then it is ignored
+ * (because that is the supposed mechanism to cancel the running coroutine)
+ * * Otherwise:
+ * * if there is a [Job] in the context, then [Job.cancel] is invoked;
+ * * and current thread's [Thread.uncaughtExceptionHandler] is invoked.
+ *
* See [handleCoroutineException].
*/
public interface CoroutineExceptionHandler : CoroutineContext.Element {
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
index 8d5f520..23518ad 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
@@ -32,6 +32,7 @@
* | --------------------------------------- | ---------- | ------------- | -------------------------- | ------------- |
* | _New_ (optional initial state) | `false` | `false` | `false` | `false` |
* | _Active_ (default initial state) | `true` | `false` | `false` | `false` |
+ * | _Completing_ (optional transient state) | `true` | `false` | `false` | `false` |
* | _Cancelling_ (optional transient state) | `false` | `false` | `false` | `true` |
* | _Cancelled_ (final state) | `false` | `true` | `true` | `true` |
* | _Resolved_ (final state) | `false` | `true` | `false` | `false` |
@@ -46,18 +47,19 @@
* _cancelling_ state immediately. A simple implementation of deferred -- [CompletableDeferred],
* that is not backed by a coroutine, does not have a _cancelling_ state, but becomes _cancelled_
* on [cancel] immediately. Coroutines, on the other hand, become _cancelled_ only when they finish
- * executing their code.
+ * executing their code and after all their [children][attachChild] complete.
*
* ```
- * +-----+ start +--------+ complete +-----------+
- * | New | ---------------> | Active | ---------+-> | Resolved |
- * +-----+ +--------+ | |(completed)|
- * | | | +-----------+
- * | cancel | cancel |
- * V V | +-----------+
- * +-----------+ finish +------------+ +-> | Failed |
- * | Cancelled | <--------- | Cancelling | |(completed)|
- * |(completed)| +------------+ +-----------+
+ * wait children
+ * +-----+ start +--------+ complete +-------------+ finish +-----------+
+ * | New | ---------------> | Active | ----------> | Completing | ---+-> | Resolved |
+ * +-----+ +--------+ +-------------+ | |(completed)|
+ * | | | | +-----------+
+ * | cancel | cancel | cancel |
+ * V V | | +-----------+
+ * +-----------+ finish +------------+ | +-> | Failed |
+ * | Cancelled | <--------- | Cancelling | <---------------+ |(completed)|
+ * |(completed)| +------------+ +-----------+
* +-----------+
* ```
*
@@ -68,7 +70,9 @@
* or the cancellation cause inside the coroutine.
*
* A deferred value can have a _parent_ job. A deferred value with a parent is cancelled when its parent is
- * cancelled or completes.
+ * cancelled or completes. Parent waits for all its [children][attachChild] to complete in _completing_ or
+ * _cancelling_ state. _Completing_ state is purely internal. For an outside observer a _completing_
+ * deferred is still active, while internally it is waiting for its children.
*
* All functions on this interface and on all interfaces derived from it are **thread-safe** and can
* be safely invoked from concurrent coroutines without external synchronization.
@@ -108,11 +112,21 @@
* [completed exceptionally][isCompletedExceptionally].
*
* This function is designed to be used from [invokeOnCompletion] handlers, when there is an absolute certainty that
- * the value is already complete.
+ * the value is already complete. See also [getCompletionExceptionOrNull].
*/
public fun getCompleted(): T
/**
+ * Returns *completion exception* result if this deferred [completed exceptionally][isCompletedExceptionally],
+ * `null` if it is completed normally, or throws [IllegalStateException] if this deferred value has not
+ * [completed][isCompleted] yet.
+ *
+ * This function is designed to be used from [invokeOnCompletion] handlers, when there is an absolute certainty that
+ * the value is already complete. See also [getCompleted].
+ */
+ public fun getCompletionExceptionOrNull(): Throwable?
+
+ /**
* @suppress **Deprecated**: Use `isActive`.
*/
@Deprecated(message = "Use `isActive`", replaceWith = ReplaceWith("isActive"))
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
index 0c2c9c5..26e8ffa 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
@@ -247,20 +247,11 @@
}
}
-internal class EventLoopImpl(
+internal abstract class ThreadEventLoop(
private val thread: Thread
) : EventLoopBase() {
- private var parentJob: Job? = null
-
- override val canComplete: Boolean get() = parentJob != null
- override val isCompleted: Boolean get() = parentJob?.isCompleted == true
override fun isCorrectThread(): Boolean = Thread.currentThread() === thread
- fun initParentJob(coroutine: Job) {
- require(this.parentJob == null)
- this.parentJob = coroutine
- }
-
override fun unpark() {
if (Thread.currentThread() !== thread)
timeSource.unpark(thread)
@@ -274,5 +265,23 @@
// reschedule the rest of delayed tasks
rescheduleAllDelayed()
}
+
}
+private class EventLoopImpl(thread: Thread) : ThreadEventLoop(thread) {
+ private var parentJob: Job? = null
+
+ override val canComplete: Boolean get() = parentJob != null
+ override val isCompleted: Boolean get() = parentJob?.isCompleted == true
+
+ fun initParentJob(parentJob: Job) {
+ require(this.parentJob == null)
+ this.parentJob = parentJob
+ }
+}
+
+internal class BlockingEventLoop(thread: Thread) : ThreadEventLoop(thread) {
+ override val canComplete: Boolean get() = true
+ @Volatile
+ public override var isCompleted: Boolean = false
+}
\ No newline at end of file
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
index 86fd3f5..fef9fa6 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
@@ -21,6 +21,7 @@
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
import kotlinx.coroutines.experimental.internal.OpDescriptor
+import kotlinx.coroutines.experimental.internal.unwrap
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
import kotlinx.coroutines.experimental.selects.SelectClause0
import kotlinx.coroutines.experimental.selects.SelectClause1
@@ -29,6 +30,7 @@
import java.util.concurrent.Future
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.CoroutineContext
+import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
// --------------- core job interfaces ---------------
@@ -47,6 +49,7 @@
* | --------------------------------------- | ---------- | ------------- | ------------- |
* | _New_ (optional initial state) | `false` | `false` | `false` |
* | _Active_ (default initial state) | `true` | `false` | `false` |
+ * | _Completing_ (optional transient state) | `true` | `false` | `false` |
* | _Cancelling_ (optional transient state) | `false` | `false` | `true` |
* | _Cancelled_ (final state) | `false` | `true` | `true` |
* | _Completed normally_ (final state) | `false` | `true` | `false` |
@@ -56,30 +59,33 @@
* [CoroutineStart.LAZY]. Such a job can be made _active_ by invoking [start] or [join].
*
* A job can be _cancelled_ at any time with [cancel] function that forces it to transition to
- * _cancelling_ state immediately. Simple jobs, that are not backed by a coroutine, like
- * [CompletableDeferred] and the result of `Job()` factory function, don't
- * have a _cancelling_ state, but become _cancelled_ on [cancel] immediately.
- * Coroutines, on the other hand, become _cancelled_ only when they finish executing their code.
+ * _cancelling_ state immediately. Job that is not backed by a coroutine and does not have
+ * [children][attachChild] becomes _cancelled_ on [cancel] immediately.
+ * Otherwise, job becomes _cancelled_ when it finishes executing its code and
+ * when all its children [complete][isCompleted].
*
* ```
- * +-----+ start +--------+ complete +-----------+
- * | New | ---------------> | Active | -----------> | Completed |
- * +-----+ +--------+ | normally |
- * | | +-----------+
- * | cancel | cancel
- * V V
- * +-----------+ finish +------------+
- * | Cancelled | <--------- | Cancelling |
+ * wait children
+ * +-----+ start +--------+ complete +-------------+ finish +-----------+
+ * | New | ---------------> | Active | -----------> | Completing | -------> | Completed |
+ * +-----+ +--------+ +-------------+ | normally |
+ * | | | +-----------+
+ * | cancel | cancel | cancel
+ * V V |
+ * +-----------+ finish +------------+ |
+ * | Cancelled | <--------- | Cancelling | <----------------+
* |(completed)| +------------+
* +-----------+
* ```
*
- * A job in the coroutine [context][CoroutineScope.context] represents the coroutine itself.
+ * A job in the [coroutineContext][CoroutineScope.coroutineContext] represents the coroutine itself.
* A job is active while the coroutine is working and job's cancellation aborts the coroutine when
- * the coroutine is suspended on a _cancellable_ suspension point by throwing [CancellationException]
- * or the cancellation cause inside the coroutine.
+ * the coroutine is suspended on a _cancellable_ suspension point by throwing [CancellationException].
*
* A job can have a _parent_ job. A job with a parent is cancelled when its parent is cancelled or completes.
+ * Parent job waits for all its [children][attachChild] to complete in _completing_ or _cancelling_ state.
+ * _Completing_ state is purely internal to the job. For an outside observer a _completing_ job is still active,
+ * while internally it is waiting for its children.
*
* All functions on this interface and on all interfaces derived from it are **thread-safe** and can
* be safely invoked from concurrent coroutines without external synchronization.
@@ -111,12 +117,15 @@
/**
* Returns `true` when this job is active -- it was already started and has not completed or cancelled yet.
+ * The job that is waiting for its [children][attachChild] to complete is still considered to be active if it
+ * was not cancelled.
*/
public val isActive: Boolean
/**
* Returns `true` when this job has completed for any reason. A job that was cancelled and has
- * finished its execution is also considered complete.
+ * finished its execution is also considered complete. Job becomes complete only after
+ * all its [children][attachChild] complete.
*/
public val isCompleted: Boolean
@@ -127,16 +136,26 @@
public val isCancelled: Boolean
/**
- * Returns the exception that signals the completion of this job -- it returns the original
- * [cancel] cause or an instance of [CancellationException] if this job had completed
- * normally or was cancelled without a cause. This function throws
- * [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor
- * [isCancelled] yet.
+ * Returns [CancellationException] that signals the completion of this job.
+ *
+ * It returns the original [cancel] cause if it is an instance of [CancellationException] or
+ * an instance of [JobCancellationException] if this job was cancelled with a cause of
+ * different type, was cancelled without a cause or had completed normally.
+ *
+ * This function throws [IllegalStateException] when invoked for an job that has not
+ * [completed][isCompleted] nor [cancelled][isCancelled] yet.
*
* The [cancellable][suspendCancellableCoroutine] suspending functions throw this exception
* when trying to suspend in the context of this job.
*/
- public fun getCompletionException(): Throwable
+ public fun getCancellationException(): CancellationException
+
+ /**
+ * @suppress **Deprecated**: Renamed to [getCancellationException]
+ */
+ @Deprecated("Renamed to getCancellationException", replaceWith = ReplaceWith("getCancellationException()"))
+ public fun getCompletionException(): Throwable =
+ getCancellationException()
// ------------ state update ------------
@@ -148,7 +167,7 @@
public fun start(): Boolean
/**
- * Cancel this job with an optional cancellation [cause]. The result is `true` if this job was
+ * Cancels this job with an optional cancellation [cause]. The result is `true` if this job was
* cancelled as a result of this invocation and `false` otherwise
* (if it was already _completed_ or if it is [NonCancellable]).
* Repeated invocations of this function have no effect and always produce `false`.
@@ -159,18 +178,54 @@
*/
public fun cancel(cause: Throwable? = null): Boolean
+ // ------------ parent-child ------------
+
+ /**
+ * Attaches child job so that this job becomes its parent and
+ * returns a handle that should be used to detach it.
+ *
+ * A parent-child relation has the following effect:
+ * * Cancellation of parent with [cancel] immediately cancels all its children with the same cause.
+ * * Parent cannot complete until all its children are complete. Parent waits for all its children to
+ * complete first in _completing_ or _cancelling_ state.
+ *
+ * A child must store the resulting [DisposableHandle] and [dispose][DisposableHandle.dispose] the attachment
+ * to its parent on its own completion.
+ *
+ * Coroutine builders and job factory functions that accept `parent` [CoroutineContext] parameter
+ * lookup a [Job] instance in the parent context and use this function to attach themselves as a child.
+ * They also store a reference to the resulting [DisposableHandle] and dispose a handle when they complete.
+ */
+ public fun attachChild(child: Job): DisposableHandle
+
+ /**
+ * Cancels all [children][attachChild] jobs of this coroutine with the given [cause]. Unlike [cancel],
+ * the state of this job itself is not affected.
+ */
+ public fun cancelChildren(cause: Throwable? = null)
+
// ------------ state waiting ------------
/**
* Suspends coroutine until this job is complete. This invocation resumes normally (without exception)
- * when the job is complete for any reason. This function also [starts][Job.start] the corresponding coroutine
- * if the [Job] was still in _new_ state.
+ * when the job is complete for any reason and the [Job] of the invoking coroutine is still [active][isActive].
+ * This function also [starts][Job.start] the corresponding coroutine if the [Job] was still in _new_ state.
*
- * This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this
- * suspending function is suspended, this function immediately resumes with [CancellationException].
+ * Note, that the job becomes complete only when all its [children][attachChild] are complete.
+ *
+ * This suspending function is cancellable and **always** checks for the cancellation of invoking coroutine's Job.
+ * If the [Job] of the invoking coroutine is cancelled or completed when this
+ * suspending function is invoked or while it is suspended, this function
+ * throws [CancellationException].
+ *
+ * In particular, it means that a parent coroutine invoking `join` on a child coroutine that was started using
+ * `launch(coroutineContext) { ... }` builder throws [CancellationException] if the child
+ * had crashed, unless a non-standard [CoroutineExceptionHandler] if installed in the context.
*
* This function can be used in [select] invocation with [onJoin] clause.
* Use [isCompleted] to check for completion of this job without waiting.
+ *
+ * There is [cancelAndJoin] function that combines an invocation of [cancel] and `join`.
*/
public suspend fun join()
@@ -182,29 +237,12 @@
// ------------ low-level state-notification ------------
- /**
- * Registers handler that is **synchronously** invoked once on completion of this job.
- * When job is already complete, then the handler is immediately invoked
- * with a job's cancellation cause or `null`. Otherwise, handler will be invoked once when this
- * job is complete.
- *
- * The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] the
- * registration of this handler and release its memory if its invocation is no longer needed.
- * There is no need to dispose the handler after completion of this job. The references to
- * all the handlers are released when this job completes.
- *
- * Note, that the handler is not invoked on invocation of [cancel] when
- * job becomes _cancelling_, but only when the corresponding coroutine had finished execution
- * of its code and became _cancelled_. There is an overloaded version of this function
- * with `onCancelling` parameter to receive notification on _cancelling_ state.
- *
- * **Note**: This function is a part of internal machinery that supports parent-child hierarchies
- * and allows for implementation of suspending functions that wait on the Job's state.
- * This function should not be used in general application code.
- * Implementations of `CompletionHandler` must be fast and _lock-free_.
- */
+ @Deprecated(message = "For binary compatibility", level = DeprecationLevel.HIDDEN)
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
+ @Deprecated(message = "For binary compatibility", level = DeprecationLevel.HIDDEN)
+ public fun invokeOnCompletion(handler: CompletionHandler, onCancelling: Boolean): DisposableHandle
+
/**
* Registers handler that is **synchronously** invoked once on cancellation or completion of this job.
* When job is already complete, then the handler is immediately invoked
@@ -214,7 +252,7 @@
* Invocation of this handler on a transition to a transient _cancelling_ state
* is controlled by [onCancelling] boolean parameter.
* The handler is invoked on invocation of [cancel] when
- * job becomes _cancelling_ when [onCancelling] parameters is set to `true`. However,
+ * job becomes _cancelling_ if [onCancelling] parameter is set to `true`. However,
* when this [Job] is not backed by a coroutine, like [CompletableDeferred] or [CancellableContinuation]
* (both of which do not posses a _cancelling_ state), then the value of [onCancelling] parameter is ignored.
*
@@ -223,12 +261,15 @@
* There is no need to dispose the handler after completion of this job. The references to
* all the handlers are released when this job completes.
*
+ * Installed [handler] should not throw any exceptions. If it does, they will get caught,
+ * wrapped into [CompletionHandlerException], and rethrown, potentially causing crash of unrelated code.
+ *
* **Note**: This function is a part of internal machinery that supports parent-child hierarchies
* and allows for implementation of suspending functions that wait on the Job's state.
* This function should not be used in general application code.
* Implementations of `CompletionHandler` must be fast and _lock-free_.
*/
- public fun invokeOnCompletion(handler: CompletionHandler, onCancelling: Boolean): DisposableHandle
+ public fun invokeOnCompletion(onCancelling: Boolean = false, handler: CompletionHandler): DisposableHandle
// ------------ unstable internal API ------------
@@ -266,6 +307,7 @@
* Creates a new job object in an _active_ state.
* It is optionally a child of a [parent] job.
*/
+@Suppress("FunctionName")
public fun Job(parent: Job? = null): Job = JobImpl(parent)
/**
@@ -291,12 +333,20 @@
/**
* Handler for [Job.invokeOnCompletion].
*
+ * Installed handler should not throw any exceptions. If it does, they will get caught,
+ * wrapped into [CompletionHandlerException], and rethrown, potentially causing crash of unrelated code.
+ *
* **Note**: This type is a part of internal machinery that supports parent-child hierarchies
* and allows for implementation of suspending functions that wait on the Job's state.
* This type should not be used in general application code.
* Implementations of `CompletionHandler` must be fast and _lock-free_.
*/
-public typealias CompletionHandler = (Throwable?) -> Unit
+public typealias CompletionHandler = (cause: Throwable?) -> Unit
+
+/**
+ * This exception gets thrown if an exception is caught while processing [CompletionHandler] invocation for [Job].
+ */
+public class CompletionHandlerException(message: String, cause: Throwable) : RuntimeException(message, cause)
/**
* Thrown by cancellable suspending functions if the [Job] of the coroutine is cancelled while it is suspending.
@@ -304,6 +354,35 @@
public typealias CancellationException = java.util.concurrent.CancellationException
/**
+ * Thrown by cancellable suspending functions if the [Job] of the coroutine is cancelled or completed
+ * without cause, or with a cause or exception that is not [CancellationException]
+ * (see [Job.getCancellationException]).
+ */
+public class JobCancellationException(
+ message: String,
+ cause: Throwable?,
+ /**
+ * The job that was cancelled.
+ */
+ public val job: Job
+) : CancellationException(message) {
+ init { if (cause != null) initCause(cause) }
+ override fun toString(): String = "${super.toString()}; job=$job"
+ override fun equals(other: Any?): Boolean =
+ other === this ||
+ other is JobCancellationException && other.message == message && other.job == job && other.cause == cause
+ override fun hashCode(): Int =
+ (message!!.hashCode() * 31 + job.hashCode()) * 31 + (cause?.hashCode() ?: 0)
+}
+
+/**
+ * Represents an exception in the coroutine that was not caught by it and was not expected to be thrown.
+ * This happens when coroutine is cancelled, but it completes with the different exception than its cancellation
+ * cause was.
+ */
+public class UnexpectedCoroutineException(message: String, cause: Throwable) : IllegalStateException(message, cause)
+
+/**
* Unregisters a specified [registration] when this job is complete.
*
* This is a shortcut for the following code with slightly more efficient implementation (one fewer object created).
@@ -315,7 +394,7 @@
@Deprecated(message = "Renamed to `disposeOnCompletion`",
replaceWith = ReplaceWith("disposeOnCompletion(registration)"))
public fun Job.unregisterOnCompletion(registration: DisposableHandle): DisposableHandle =
- invokeOnCompletion(DisposeOnCompletion(this, registration))
+ invokeOnCompletion(handler = DisposeOnCompletion(this, registration))
/**
* Disposes a specified [handle] when this job is complete.
@@ -326,7 +405,7 @@
* ```
*/
public fun Job.disposeOnCompletion(handle: DisposableHandle): DisposableHandle =
- invokeOnCompletion(DisposeOnCompletion(this, handle))
+ invokeOnCompletion(handler = DisposeOnCompletion(this, handle))
/**
* Cancels a specified [future] when this job is complete.
@@ -337,13 +416,19 @@
* ```
*/
public fun Job.cancelFutureOnCompletion(future: Future<*>): DisposableHandle =
- invokeOnCompletion(CancelFutureOnCompletion(this, future))
+ invokeOnCompletion(handler = CancelFutureOnCompletion(this, future))
/**
* Cancels the job and suspends invoking coroutine until the cancelled job is complete.
*
- * This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this
- * suspending function is suspended, this function immediately resumes with [CancellationException].
+ * This suspending function is cancellable and **always** checks for the cancellation of invoking coroutine's Job.
+ * If the [Job] of the invoking coroutine is cancelled or completed when this
+ * suspending function is invoked or while it is suspended, this function
+ * throws [CancellationException].
+ *
+ * In particular, it means that a parent coroutine invoking `cancelAndJoin` on a child coroutine that was started using
+ * `launch(coroutineContext) { ... }` builder throws [CancellationException] if the child
+ * had crashed, unless a non-standard [CoroutineExceptionHandler] if installed in the context.
*
* This is a shortcut for the invocation of [cancel][Job.cancel] followed by [join][Job.join].
*/
@@ -353,6 +438,23 @@
}
/**
+ * Cancels [Job] of this context with an optional cancellation [cause]. The result is `true` if the job was
+ * cancelled as a result of this invocation and `false` if there is no job in the context or if it was already
+ * cancelled or completed. See [Job.cancel] for details.
+ */
+public fun CoroutineContext.cancel(cause: Throwable? = null): Boolean =
+ this[Job]?.cancel(cause) ?: false
+
+/**
+ * Cancels all children of the [Job] in this context with an optional cancellation [cause].
+ * It does not do anything if there is no job in the context or it has no children.
+ * See [Job.cancelChildren] for details.
+ */
+public fun CoroutineContext.cancelChildren(cause: Throwable? = null) {
+ this[Job]?.cancelChildren(cause)
+}
+
+/**
* @suppress **Deprecated**: `join` is now a member function of `Job`.
*/
@Suppress("EXTENSION_SHADOWED_BY_MEMBER", "DeprecatedCallableAddReplaceWith")
@@ -364,6 +466,7 @@
*/
@Deprecated(message = "Replace with `NonDisposableHandle`",
replaceWith = ReplaceWith("NonDisposableHandle"))
+@Suppress("unused")
typealias EmptyRegistration = NonDisposableHandle
/**
@@ -377,7 +480,7 @@
override fun toString(): String = "NonDisposableHandle"
}
-// --------------- utility classes to simplify job implementation
+// --------------- helper classes to simplify job implementation
/**
* A concrete implementation of [Job]. It is optionally a child to a parent job.
@@ -403,46 +506,50 @@
SINGLE+ JobNode : Active a single listener + NodeList added as its next
LIST_N NodeList : New a list of listeners (promoted once, does not got back to EmptyNew)
LIST_A NodeList : Active a list of listeners (promoted once, does not got back to JobNode/EmptyActive)
- CANCELLING Cancelling : Cancelling(*) a list of listeners (promoted once)
+ COMPLETING Finishing : Completing has a list of listeners (promoted once from LIST_*)
+ CANCELLING Finishing : Cancelling has a list of listeners (promoted once from LIST_*)
FINAL_C Cancelled : Cancelled cancelled (final state)
FINAL_F Failed : Completed failed for other reason (final state)
FINAL_R <any> : Completed produced some result
=== Transitions ===
- New states Active states Inactive states
- +---------+ +---------+ +----------+
- | EMPTY_N | --+-> | EMPTY_A | --+-> | FINAL_* |
- +---------+ | +---------+ | +----------+
- | | | ^ |
- | | V | |
- | | +---------+ |
- | | | SINGLE | --+
- | | +---------+ |
- | | | |
- | | V |
- | | +---------+ |
- | +-- | SINGLE+ | --+
- | +---------+ |
- | | |
- V V |
- +---------+ +---------+ |
- | LIST_N | ----> | LIST_A | --+
- +---------+ +---------+ |
- | | |
- | V |
- | +------------+ |
- +-------> | CANCELLING | --+
- +------------+
+ New states Active states Inactive states
+
+ +---------+ +---------+ }
+ | EMPTY_N | --+-> | EMPTY_A | ----+ } Empty states
+ +---------+ | +---------+ | }
+ | | | ^ | +----------+
+ | | | | +--> | FINAL_* |
+ | | V | | +----------+
+ | | +---------+ | }
+ | | | SINGLE | ----+ } JobNode states
+ | | +---------+ | }
+ | | | | }
+ | | V | }
+ | | +---------+ | }
+ | +-- | SINGLE+ | ----+ }
+ | +---------+ | }
+ | | |
+ V V |
+ +---------+ +---------+ | }
+ | LIST_N | ----> | LIST_A | ----+ } NodeList states
+ +---------+ +---------+ | }
+ | | | | |
+ | | +--------+ | |
+ | | | V |
+ | | | +------------+ | +------------+ }
+ | +-------> | COMPLETING | --+-- | CANCELLING | } Finishing states
+ | | +------------+ +------------+ }
+ | | | ^
+ | | | |
+ +--------+---------+--------------------+
+
This state machine and its transition matrix are optimized for the common case when job is created in active
state (EMPTY_A) and at most one completion listener is added to it during its life-time.
Note, that the actual `_state` variable can also be a reference to atomic operation descriptor `OpDescriptor`
-
- (*) The CANCELLING state is used only in AbstractCoroutine class. A general Job (that does not
- extend AbstractCoroutine) does not have CANCELLING state. It immediately transitions to
- FINAL_C (Cancelled) state on cancellation/completion
*/
// Note: use shared objects while we have no listeners
@@ -464,27 +571,10 @@
return
}
parent.start() // make sure the parent is started
- // directly pass HandlerNode to parent scope to optimize one closure object (see makeNode)
- val newRegistration = parent.invokeOnCompletion(ParentOnCancellation(parent), onCancelling = true)
- parentHandle = newRegistration
+ val handle = parent.attachChild(this)
+ parentHandle = handle
// now check our state _after_ registering (see updateState order of actions)
- if (isCompleted) newRegistration.dispose()
- }
-
- private inner class ParentOnCancellation(parent: Job) : JobCancellationNode<Job>(parent) {
- override fun invokeOnce(reason: Throwable?) { onParentCancellation(reason) }
- override fun toString(): String = "ParentOnCancellation[${this@JobSupport}]"
- }
-
- /**
- * Invoked at most once on parent completion.
- * @suppress **This is unstable API and it is subject to change.**
- */
- protected open fun onParentCancellation(cause: Throwable?) {
- // if parent was completed with CancellationException then use it as the cause of our cancellation, too.
- // however, we shall not use application specific exceptions here. So if parent crashes due to IOException,
- // we cannot and should not cancel the child with IOException
- cancel(cause as? CancellationException)
+ if (isCompleted) handle.dispose()
}
// ------------ state query ------------
@@ -499,6 +589,9 @@
}
}
+ /**
+ * @suppress **This is unstable API and it is subject to change.**
+ */
protected inline fun loopOnState(block: (Any?) -> Unit): Nothing {
while (true) {
block(state)
@@ -514,41 +607,57 @@
public final override val isCancelled: Boolean get() {
val state = this.state
- return state is Cancelled || state is Cancelling
+ return state is Cancelled || (state is Finishing && state.cancelled != null)
}
// ------------ state update ------------
/**
- * Updates current [state] of this job.
+ * Updates current [state] of this job. Returns `false` if current state is not equal to expected.
+ * @suppress **This is unstable API and it is subject to change.**
*/
- protected fun updateState(expect: Any, proposedUpdate: Any?, mode: Int): Boolean {
+ internal fun updateState(expect: Incomplete, proposedUpdate: Any?, mode: Int): Boolean {
val update = coerceProposedUpdate(expect, proposedUpdate)
if (!tryUpdateState(expect, update)) return false
completeUpdateState(expect, update, mode)
// if an exceptional completion was suppressed (because cancellation was in progress), then report it separately
- if (proposedUpdate !== update && proposedUpdate is CompletedExceptionally && proposedUpdate.cause != null)
- handleException(proposedUpdate.cause)
+ if (proposedUpdate is CompletedExceptionally && proposedUpdate.cause != null && !incorporatedCause(update, proposedUpdate.cause)) {
+ handleException(UnexpectedCoroutineException("Unexpected exception while cancellation is in progress; job=$this", proposedUpdate.cause))
+ }
return true
}
+ /**
+ * Checks if the cause that was proposed for state update is consistent with the resulting updated state
+ * and not exception information was lost. The key observation here is that [getCancellationException] wraps
+ * exceptions that are not [CancellationException] into an instance of [JobCancellationException] and we allow
+ * that [JobCancellationException] to be unwrapped again when it reaches the coroutine that was cancelled.
+ *
+ * NOTE: equality comparison of exceptions is performed here by design, see equals of JobCancellationException
+ */
+ private fun incorporatedCause(update: Any?, proposedCause: Throwable) =
+ update is CompletedExceptionally && update.exception.let { ex ->
+ ex == proposedCause || proposedCause is JobCancellationException && ex == proposedCause.cause
+ }
+
// when Job is in Cancelling state, it can only be promoted to Cancelled state with the same cause
- // however, null cause can be replaced with more specific CancellationException (that contains stack trace)
- private fun coerceProposedUpdate(expect: Any, proposedUpdate: Any?): Any? =
- if (expect is Cancelling && !correspondinglyCancelled(expect, proposedUpdate))
+ // however, null cause can be replaced with more specific JobCancellationException (that contains better stack trace)
+ private fun coerceProposedUpdate(expect: Incomplete, proposedUpdate: Any?): Any? =
+ if (expect is Finishing && expect.cancelled != null && !correspondinglyCancelled(expect.cancelled, proposedUpdate))
expect.cancelled else proposedUpdate
- private fun correspondinglyCancelled(expect: Cancelling, proposedUpdate: Any?): Boolean {
+ private fun correspondinglyCancelled(cancelled: Cancelled, proposedUpdate: Any?): Boolean {
if (proposedUpdate !is Cancelled) return false
- return proposedUpdate.cause === expect.cancelled.cause ||
- proposedUpdate.cause is CancellationException && expect.cancelled.cause == null
+ return proposedUpdate.cause === cancelled.cause ||
+ proposedUpdate.cause is JobCancellationException && cancelled.cause == null
}
/**
* Tries to initiate update of the current [state] of this job.
+ * @suppress **This is unstable API and it is subject to change.**
*/
- protected fun tryUpdateState(expect: Any, update: Any?): Boolean {
- require(expect is Incomplete && update !is Incomplete) // only incomplete -> completed transition is allowed
+ internal fun tryUpdateState(expect: Incomplete, update: Any?): Boolean {
+ require(update !is Incomplete) // only incomplete -> completed transition is allowed
if (!_state.compareAndSet(expect, update)) return false
// Unregister from parent job
parentHandle?.dispose() // volatile read parentHandle _after_ state was updated
@@ -557,23 +666,23 @@
/**
* Completes update of the current [state] of this job.
+ * @suppress **This is unstable API and it is subject to change.**
*/
- protected fun completeUpdateState(expect: Any, update: Any?, mode: Int) {
+ internal fun completeUpdateState(expect: Incomplete, update: Any?, mode: Int) {
// Invoke completion handlers
- val cause = (update as? CompletedExceptionally)?.cause
- when (expect) {
- is JobNode<*> -> try { // SINGLE/SINGLE+ state -- one completion handler (common case)
+ val exceptionally = update as? CompletedExceptionally
+ val cause = exceptionally?.cause
+ if (expect is JobNode<*>) { // SINGLE/SINGLE+ state -- one completion handler (common case)
+ try {
expect.invoke(cause)
} catch (ex: Throwable) {
- handleException(ex)
+ handleException(CompletionHandlerException("Exception in completion handler $expect for $this", ex))
}
- is NodeList -> notifyCompletion(expect, cause) // LIST state -- a list of completion handlers
- is Cancelling -> notifyCompletion(expect.list, cause) // has list, too
- // otherwise -- do nothing (it was Empty*)
- else -> check(expect is Empty)
+ } else {
+ expect.list?.notifyCompletion(cause)
}
// Do overridable processing after completion handlers
- if (expect !is Cancelling) onCancellation() // only notify when was not cancelling before
+ if (!expect.isCancelling) onCancellation(exceptionally) // only notify when was not cancelling before
afterCompletion(update, mode)
}
@@ -583,15 +692,16 @@
try {
node.invoke(cause)
} catch (ex: Throwable) {
- exception?.apply { addSuppressed(ex) } ?: run { exception = ex }
+ exception?.apply { addSuppressed(ex) } ?: run {
+ exception = CompletionHandlerException("Exception in completion handler $node for $this", ex)
+ }
}
-
}
exception?.let { handleException(it) }
}
- private fun notifyCompletion(list: NodeList, cause: Throwable?) =
- notifyHandlers<JobNode<*>>(list, cause)
+ private fun NodeList.notifyCompletion(cause: Throwable?) =
+ notifyHandlers<JobNode<*>>(this, cause)
private fun notifyCancellation(list: NodeList, cause: Throwable?) =
notifyHandlers<JobCancellationNode<*>>(list, cause)
@@ -618,10 +728,9 @@
return TRUE
}
is NodeList -> { // LIST -- a list of completion handlers (either new or active)
- if (state._active.value != 0) return FALSE
- if (!state._active.compareAndSet(0, 1)) return RETRY
- onStart()
- return TRUE
+ return state.tryMakeActive().also { result ->
+ if (result == TRUE) onStart()
+ }
}
else -> return FALSE // not a new state
}
@@ -632,16 +741,22 @@
*/
protected open fun onStart() {}
- public final override fun getCompletionException(): Throwable {
+ public final override fun getCancellationException(): CancellationException {
val state = this.state
- return when (state) {
- is Cancelling -> state.cancelled.exception
- is Incomplete -> error("Job was not completed or cancelled yet")
- is CompletedExceptionally -> state.exception
- else -> CancellationException("Job has completed normally")
+ return when {
+ state is Finishing && state.cancelled != null ->
+ state.cancelled.exception.toCancellationException("Job is being cancelled")
+ state is Incomplete ->
+ error("Job was not completed or cancelled yet: $this")
+ state is CompletedExceptionally ->
+ state.exception.toCancellationException("Job has failed")
+ else -> JobCancellationException("Job has completed normally", null, this)
}
}
+ private fun Throwable.toCancellationException(message: String): CancellationException =
+ this as? CancellationException ?: JobCancellationException(message, this, this@JobSupport)
+
/**
* Returns the cause that signals the completion of this job -- it returns the original
* [cancel] cause or **`null` if this job had completed
@@ -651,20 +766,25 @@
*/
protected fun getCompletionCause(): Throwable? {
val state = this.state
- return when (state) {
- is Cancelling -> state.cancelled.cause
- is Incomplete -> error("Job was not completed or cancelled yet")
- is CompletedExceptionally -> state.cause
+ return when {
+ state is Finishing && state.cancelled != null -> state.cancelled.cause
+ state is Incomplete -> error("Job was not completed or cancelled yet")
+ state is CompletedExceptionally -> state.cause
else -> null
}
}
+ @Suppress("OverridingDeprecatedMember")
public final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle =
installHandler(handler, onCancelling = false)
+ @Suppress("OverridingDeprecatedMember")
public final override fun invokeOnCompletion(handler: CompletionHandler, onCancelling: Boolean): DisposableHandle =
installHandler(handler, onCancelling = onCancelling && hasCancellingState)
+ public final override fun invokeOnCompletion(onCancelling: Boolean, handler: CompletionHandler): DisposableHandle =
+ installHandler(handler, onCancelling = onCancelling && hasCancellingState)
+
private fun installHandler(handler: CompletionHandler, onCancelling: Boolean): DisposableHandle {
var nodeCache: JobNode<*>? = null
loopOnState { state ->
@@ -677,22 +797,21 @@
} else
promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
}
- is JobNode<*> -> { // SINGLE/SINGLE+ state -- one completion handler
- promoteSingleToNodeList(state)
- }
- is NodeList -> { // LIST -- a list of completion handlers (either new or active)
- val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
- if (addLastAtomic(state, state, node)) return node
- }
- is Cancelling -> { // CANCELLING -- has a list of completion handlers
- if (onCancelling) { // installing cancellation handler on job that is being cancelled
- handler((state as? CompletedExceptionally)?.exception)
- return NonDisposableHandle
+ is Incomplete -> {
+ val list = state.list
+ if (list == null) { // SINGLE/SINGLE+
+ promoteSingleToNodeList(state as JobNode<*>)
+ } else {
+ if (state is Finishing && state.cancelled != null && onCancelling) {
+ // installing cancellation handler on job that is being cancelled
+ handler((state as? CompletedExceptionally)?.exception)
+ return NonDisposableHandle
+ }
+ val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
+ if (addLastAtomic(state, list, node)) return node
}
- val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
- if (addLastAtomic(state, state.list, node)) return node
}
- else -> { // is inactive
+ else -> { // is complete
handler((state as? CompletedExceptionally)?.exception)
return NonDisposableHandle
}
@@ -727,7 +846,12 @@
}
final override suspend fun join() {
- if (!joinInternal()) return // fast-path no wait
+ if (!joinInternal()) { // fast-path no wait
+ return suspendCoroutineOrReturn { cont ->
+ cont.context.checkCompletion()
+ Unit // do not suspend
+ }
+ }
return joinSuspend() // slow-path wait
}
@@ -739,7 +863,7 @@
}
private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
- cont.disposeOnCompletion(invokeOnCompletion(ResumeOnCompletion(this, cont)))
+ cont.disposeOnCompletion(invokeOnCompletion(handler = ResumeOnCompletion(this, cont)))
}
final override val onJoin: SelectClause0
@@ -752,13 +876,15 @@
if (select.isSelected) return
if (state !is Incomplete) {
// already complete -- select result
- if (select.trySelect(null))
+ if (select.trySelect(null)) {
+ select.completion.context.checkCompletion() // always check for our completion
block.startCoroutineUndispatched(select.completion)
+ }
return
}
if (startInternal(state) == 0) {
// slow-path -- register waiter for completion
- select.disposeOnSelect(invokeOnCompletion(SelectJoinOnCompletion(this, select, block)))
+ select.disposeOnSelect(invokeOnCompletion(handler = SelectJoinOnCompletion(this, select, block)))
return
}
}
@@ -773,12 +899,12 @@
// try remove and revert back to empty state
if (_state.compareAndSet(state, EmptyActive)) return
}
- is NodeList, is Cancelling -> { // LIST or CANCELLING -- a list of completion handlers
- // remove node from the list
- node.remove()
+ is Incomplete -> { // may have a list of completion handlers
+ // remove node from the list if there is a list
+ if (state.list != null) node.remove()
return
}
- else -> return // it is inactive or Empty* (does not have any completion handlers)
+ else -> return // it is complete and does not have any completion handlers
}
}
}
@@ -793,7 +919,7 @@
// we will be dispatching coroutine to process its cancellation exception, so there is no need for
// an extra check for Job status in MODE_CANCELLABLE
private fun updateStateCancelled(state: Incomplete, cause: Throwable?) =
- updateState(state, Cancelled(cause), mode = MODE_ATOMIC_DEFAULT)
+ updateState(state, Cancelled(this, cause), mode = MODE_ATOMIC_DEFAULT)
// transitions to Cancelled state
private fun makeCancelled(cause: Throwable?): Boolean {
@@ -821,25 +947,112 @@
}
is NodeList -> { // LIST -- a list of completion handlers (either new or active)
if (state.isActive) {
- // try make it cancelling on the condition that we're still in this state
- if (_state.compareAndSet(state, Cancelling(state, Cancelled(cause)))) {
- notifyCancellation(state, cause)
- onCancellation()
- return true
- }
+ if (tryMakeCancelling(state, state.list, cause)) return true
} else {
// cancelling a non-started coroutine makes it immediately cancelled
if (updateStateCancelled(state, cause))
return true
}
}
- else -> { // is inactive or already cancelling
+ is Finishing -> { // Completing/Cancelling the job, may cancel
+ if (state.cancelled != null) return false // already cancelling
+ if (tryMakeCancelling(state, state.list, cause)) return true
+ }
+ else -> { // is inactive
return false
}
}
}
}
+ // try make expected state in cancelling on the condition that we're still in this state
+ private fun tryMakeCancelling(expect: Incomplete, list: NodeList, cause: Throwable?): Boolean {
+ val cancelled = Cancelled(this, cause)
+ if (!_state.compareAndSet(expect, Finishing(list, cancelled, false))) return false
+ notifyCancellation(list, cause)
+ onCancellation(cancelled)
+ return true
+ }
+
+ /**
+ * Returns:
+ * * `true` if state was updated to completed/cancelled;
+ * * `false` if made completing or it is cancelling and is waiting for children.
+ *
+ * @throws IllegalStateException if job is already complete or completing
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ internal fun makeCompleting(proposedUpdate: Any?, mode: Int): Boolean {
+ loopOnState { state ->
+ if (state !is Incomplete)
+ throw IllegalStateException("Job $this is already complete, but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull)
+ if (state is Finishing && state.completing)
+ throw IllegalStateException("Job $this is already completing, but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull)
+ val waitChild: Child = firstChild(state) ?: // or else complete immediately
+ if (updateState(state, proposedUpdate, mode)) return true else return@loopOnState
+ // switch to completing state
+ if (state is JobNode<*>) {
+ // must promote to list to make completing & retry
+ promoteSingleToNodeList(state)
+ } else {
+ val completing = Finishing(state.list!!, (state as? Finishing)?.cancelled, true)
+ if (_state.compareAndSet(state, completing)) {
+ waitForChild(waitChild, proposedUpdate)
+ return false
+ }
+ }
+ }
+ }
+
+ private val Any?.exceptionOrNull: Throwable?
+ get() = (this as? CompletedExceptionally)?.exception
+
+ private fun firstChild(state: Incomplete) =
+ state as? Child ?: state.list?.nextChild()
+
+ private fun waitForChild(waitChild: Child, proposedUpdate: Any?) {
+ waitChild.child.invokeOnCompletion(handler = ChildCompletion(this, waitChild, proposedUpdate))
+ }
+
+ internal fun continueCompleting(lastChild: Child, proposedUpdate: Any?) {
+ loopOnState { state ->
+ if (state !is Finishing)
+ throw IllegalStateException("Job $this is found in expected state while completing with $proposedUpdate", proposedUpdate.exceptionOrNull)
+ // figure out if we need to wait for next child
+ val waitChild = lastChild.nextChild() ?: // or else no more children
+ if (updateState(state, proposedUpdate, MODE_ATOMIC_DEFAULT)) return else return@loopOnState
+ // wait for next child
+ waitForChild(waitChild, proposedUpdate)
+ return
+ }
+ }
+
+ private fun LockFreeLinkedListNode.nextChild(): Child? {
+ var cur = this
+ while (cur.isRemoved) cur = cur.prev.unwrap() // rollback to prev non-removed (or list head)
+ while (true) {
+ cur = cur.next.unwrap()
+ if (cur.isRemoved) continue
+ if (cur is Child) return cur
+ if (cur is NodeList) return null // checked all -- no more children
+ }
+ }
+
+ override fun attachChild(child: Job): DisposableHandle =
+ invokeOnCompletion(onCancelling = true, handler = Child(this, child))
+
+ public override fun cancelChildren(cause: Throwable?) {
+ val state = this.state
+ when (state) {
+ is Child -> state.child.cancel(cause)
+ is Incomplete -> state.list?.cancelChildrenList(cause)
+ }
+ }
+
+ private fun NodeList.cancelChildrenList(cause: Throwable?) {
+ forEach<Child> { it.child.cancel(cause) }
+ }
+
/**
* Override to process any exceptions that were encountered while invoking completion handlers
* installed via [invokeOnCompletion].
@@ -851,42 +1064,78 @@
/**
* It is invoked once when job is cancelled or is completed, similarly to [invokeOnCompletion] with
* `onCancelling` set to `true`.
+ * @param exceptionally not null when the the job was cancelled or completed exceptionally,
+ * null when it has completed normally.
+ * @suppress **This is unstable API and it is subject to change.**
*/
- protected open fun onCancellation() {}
+ protected open fun onCancellation(exceptionally: CompletedExceptionally?) {}
/**
* Override for post-completion actions that need to do something with the state.
* @param mode completion mode.
+ * @suppress **This is unstable API and it is subject to change.**
*/
protected open fun afterCompletion(state: Any?, mode: Int) {}
// for nicer debugging
- override fun toString(): String {
+ override final fun toString(): String =
+ "${nameString()}{${stateString()}}@${Integer.toHexString(System.identityHashCode(this))}"
+
+ /**
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected open fun nameString(): String = this::class.java.simpleName
+
+ private fun stateString(): String {
val state = this.state
- val result = if (state is Incomplete) "" else "[$state]"
- return "${this::class.java.simpleName}{${stateToString(state)}}$result@${Integer.toHexString(System.identityHashCode(this))}"
+ return when (state) {
+ is Finishing -> buildString {
+ if (state.cancelled != null) append("Cancelling")
+ if (state.completing) append("Completing")
+ }
+ is Incomplete -> if (state.isActive) "Active" else "New"
+ is Cancelled -> "Cancelled"
+ is CompletedExceptionally -> "CompletedExceptionally"
+ else -> "Completed"
+ }
}
/**
- * Interface for incomplete [state] of a job.
+ * @suppress **This is unstable API and it is subject to change.**
*/
- public interface Incomplete {
+ internal interface Incomplete {
val isActive: Boolean
+ val list: NodeList? // is null only for Empty and JobNode incomplete state objects
}
- private class Cancelling(
- @JvmField val list: NodeList,
- @JvmField val cancelled: Cancelled
+ // Cancelling or Completing
+ private class Finishing(
+ override val list: NodeList,
+ @JvmField val cancelled: Cancelled?, /* != null when cancelling */
+ @JvmField val completing: Boolean /* true when completing */
) : Incomplete {
- override val isActive: Boolean get() = false
+ override val isActive: Boolean get() = cancelled == null
}
- private class NodeList(
+ private val Incomplete.isCancelling: Boolean
+ get() = this is Finishing && cancelled != null
+
+ /**
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ internal class NodeList(
active: Boolean
) : LockFreeLinkedListHead(), Incomplete {
- val _active = atomic(if (active) 1 else 0)
+ private val _active = atomic(if (active) 1 else 0)
override val isActive: Boolean get() = _active.value != 0
+ override val list: NodeList get() = this
+
+ fun tryMakeActive(): Int {
+ if (_active.value != 0) return FALSE
+ if (_active.compareAndSet(0, 1)) return RETRY
+ return TRUE
+ }
override fun toString(): String = buildString {
append("List")
@@ -904,32 +1153,52 @@
/**
* Class for a [state] of a job that had completed exceptionally, including cancellation.
*
- * @param cause the exceptional completion cause. If `cause` is null, then a [CancellationException]
- * if created on first get from [exception] property.
+ * @param cause the exceptional completion cause. If `cause` is null, then an exception is
+ * if created via [createException] on first get from [exception] property.
+ * @param allowNullCause if `null` cause is allowed.
*/
- public open class CompletedExceptionally(
- @JvmField val cause: Throwable?
+ public open class CompletedExceptionally protected constructor(
+ public @JvmField val cause: Throwable?,
+ allowNullCause: Boolean
) {
+ /**
+ * Creates exceptionally completed state.
+ * @param cause the exceptional completion cause.
+ */
+ public constructor(cause: Throwable) : this(cause, false)
+
@Volatile
- private var _exception: Throwable? = cause // materialize CancellationException on first need
+ private var _exception: Throwable? = cause // will materialize JobCancellationException on first need
+
+ init {
+ require(allowNullCause || cause != null) { "Null cause is not allowed" }
+ }
/**
* Returns completion exception.
*/
public val exception: Throwable get() =
_exception ?: // atomic read volatile var or else create new
- CancellationException("Job was cancelled").also { _exception = it }
+ createException().also { _exception = it }
+
+ protected open fun createException(): Throwable = error("Completion exception was not specified")
override fun toString(): String = "${this::class.java.simpleName}[$exception]"
}
/**
* A specific subclass of [CompletedExceptionally] for cancelled jobs.
+ *
+ * @param job the job that was cancelled.
+ * @param cause the exceptional completion cause. If `cause` is null, then a [JobCancellationException]
+ * if created on first get from [exception] property.
*/
public class Cancelled(
+ private val job: Job,
cause: Throwable?
- ) : CompletedExceptionally(cause)
-
+ ) : CompletedExceptionally(cause, true) {
+ override fun createException(): Throwable = JobCancellationException("Job was cancelled normally", null, job)
+ }
/*
* =================================================================================================
@@ -941,6 +1210,12 @@
public val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally
+ public fun getCompletionExceptionOrNull(): Throwable? {
+ val state = this.state
+ check(state !is Incomplete) { "This job has not completed yet" }
+ return state.exceptionOrNull
+ }
+
protected fun getCompletedInternal(): Any? {
val state = this.state
check(state !is Incomplete) { "This job has not completed yet" }
@@ -991,7 +1266,7 @@
}
if (startInternal(state) == 0) {
// slow-path -- register waiter for completion
- select.disposeOnSelect(invokeOnCompletion(SelectAwaitOnCompletion(this, select, block)))
+ select.disposeOnSelect(invokeOnCompletion(handler = SelectAwaitOnCompletion(this, select, block)))
return
}
}
@@ -1007,19 +1282,17 @@
}
}
-internal fun stateToString(state: Any?): String =
- if (state is JobSupport.Incomplete)
- if (state.isActive) "Active" else "New"
- else "Completed"
-
private const val RETRY = -1
private const val FALSE = 0
private const val TRUE = 1
+@Suppress("PrivatePropertyName")
private val EmptyNew = Empty(false)
+@Suppress("PrivatePropertyName")
private val EmptyActive = Empty(true)
private class Empty(override val isActive: Boolean) : JobSupport.Incomplete {
+ override val list: JobSupport.NodeList? get() = null
override fun toString(): String = "Empty{${if (isActive) "Active" else "New" }}"
}
@@ -1033,6 +1306,7 @@
@JvmField val job: J
) : LockFreeLinkedListNode(), DisposableHandle, CompletionHandler, JobSupport.Incomplete {
final override val isActive: Boolean get() = true
+ final override val list: JobSupport.NodeList? get() = null
final override fun dispose() = (job as JobSupport).removeNode(this)
override abstract fun invoke(reason: Throwable?)
}
@@ -1099,22 +1373,42 @@
// -------- invokeOnCancellation nodes
-internal abstract class JobCancellationNode<out J : Job>(job: J) : JobNode<J>(job) {
- // shall be invoked at most once, so here is an additional flag
- private val _invoked = atomic(0)
-
- final override fun invoke(reason: Throwable?) {
- if (_invoked.compareAndSet(0, 1)) invokeOnce(reason)
- }
-
- abstract fun invokeOnce(reason: Throwable?)
-}
+/**
+ * Marker for node that shall be invoked on cancellation (in _cancelling_ state).
+ * **Note: may be invoked multiple times during cancellation.**
+ */
+internal abstract class JobCancellationNode<out J : Job>(job: J) : JobNode<J>(job)
private class InvokeOnCancellation(
job: Job,
private val handler: CompletionHandler
) : JobCancellationNode<Job>(job) {
- override fun invokeOnce(reason: Throwable?) = handler.invoke(reason)
+ // delegate handler shall be invoked at most once, so here is an additional flag
+ private val _invoked = atomic(0)
+ override fun invoke(reason: Throwable?) {
+ if (_invoked.compareAndSet(0, 1)) handler.invoke(reason)
+ }
override fun toString() = "InvokeOnCancellation[${handler::class.java.name}@${Integer.toHexString(System.identityHashCode(handler))}]"
}
+internal class Child(
+ parent: JobSupport,
+ val child: Job
+) : JobCancellationNode<JobSupport>(parent) {
+ override fun invoke(reason: Throwable?) {
+ // Always materialize the actual instance of parent's completion exception and cancel child with it
+ child.cancel(job.getCancellationException())
+ }
+ override fun toString(): String = "Child[$child]"
+}
+
+private class ChildCompletion(
+ private val parent: JobSupport,
+ private val waitChild: Child,
+ private val proposedUpdate: Any?
+) : JobNode<Job>(waitChild.child) {
+ override fun invoke(reason: Throwable?) {
+ parent.continueCompleting(waitChild, proposedUpdate)
+ }
+}
+
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt
index 0c76c96..e47f7b7 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt
@@ -53,14 +53,25 @@
get() = throw UnsupportedOperationException("This job is always active")
/** Always throws [IllegalStateException]. */
- override fun getCompletionException(): CancellationException = throw IllegalStateException("This job is always active")
+ override fun getCancellationException(): CancellationException = throw IllegalStateException("This job is always active")
/** Always returns [NonDisposableHandle]. */
+ @Suppress("OverridingDeprecatedMember")
override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle = NonDisposableHandle
/** Always returns [NonDisposableHandle]. */
+ @Suppress("OverridingDeprecatedMember")
override fun invokeOnCompletion(handler: CompletionHandler, onCancelling: Boolean): DisposableHandle = NonDisposableHandle
+ /** Always returns [NonDisposableHandle]. */
+ override fun invokeOnCompletion(onCancelling: Boolean, handler: CompletionHandler): DisposableHandle = NonDisposableHandle
+
/** Always returns `false`. */
override fun cancel(cause: Throwable?): Boolean = false
+
+ /** Always returns [NonDisposableHandle] and does not do anything. */
+ override fun attachChild(child: Job): DisposableHandle = NonDisposableHandle
+
+ /** Does not do anything. */
+ override fun cancelChildren(cause: Throwable?) {}
}
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ResumeMode.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ResumeMode.kt
index 6729978..e7f0e1f 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ResumeMode.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ResumeMode.kt
@@ -22,6 +22,7 @@
@PublishedApi internal const val MODE_CANCELLABLE = 1 // schedule cancellable dispatch for suspendCancellableCoroutine
@PublishedApi internal const val MODE_DIRECT = 2 // when the context is right just invoke the delegate continuation direct
@PublishedApi internal const val MODE_UNDISPATCHED = 3 // when the thread is right, but need to mark it with current coroutine
+@PublishedApi internal const val MODE_IGNORE = 4 // don't do anything
fun <T> Continuation<T>.resumeMode(value: T, mode: Int) {
when (mode) {
@@ -29,6 +30,7 @@
MODE_CANCELLABLE -> resumeCancellable(value)
MODE_DIRECT -> resumeDirect(value)
MODE_UNDISPATCHED -> (this as DispatchedContinuation).resumeUndispatched(value)
+ MODE_IGNORE -> {}
else -> error("Invalid mode $mode")
}
}
@@ -39,6 +41,7 @@
MODE_CANCELLABLE -> resumeCancellableWithException(exception)
MODE_DIRECT -> resumeDirectWithException(exception)
MODE_UNDISPATCHED -> (this as DispatchedContinuation).resumeUndispatchedWithException(exception)
+ MODE_IGNORE -> {}
else -> error("Invalid mode $mode")
}
}
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt
index 700bf1d..699c028 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt
@@ -16,11 +16,12 @@
package kotlinx.coroutines.experimental
+import kotlinx.coroutines.experimental.JobSupport.CompletedExceptionally
import kotlinx.coroutines.experimental.selects.SelectBuilder
import kotlinx.coroutines.experimental.selects.select
import java.util.concurrent.TimeUnit
import kotlin.coroutines.experimental.Continuation
-import kotlin.coroutines.experimental.CoroutineContext
+import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
import kotlin.coroutines.experimental.intrinsics.startCoroutineUninterceptedOrReturn
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
@@ -29,9 +30,9 @@
* [TimeoutCancellationException] if timeout was exceeded.
*
* The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
- * cancellable suspending function inside the block throws [TimeoutCancellationException], so normally that exception,
- * if uncaught, also gets thrown by `withTimeout` as a result.
- * However, the code in the block can suppress [TimeoutCancellationException].
+ * cancellable suspending function inside the block throws [TimeoutCancellationException].
+ * Even if the code in the block suppresses [TimeoutCancellationException], it
+ * is still thrown by `withTimeout` invocation.
*
* The sibling function that does not throw exception on timeout is [withTimeoutOrNull].
* Note, that timeout action can be specified for [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
@@ -42,31 +43,68 @@
* @param time timeout time
* @param unit timeout unit (milliseconds by default)
*/
-public suspend fun <T> withTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend () -> T): T {
+public suspend fun <T> withTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend CoroutineScope.() -> T): T {
require(time >= 0) { "Timeout time $time cannot be negative" }
if (time <= 0L) throw CancellationException("Timed out immediately")
return suspendCoroutineOrReturn { cont: Continuation<T> ->
- val context = cont.context
- val completion = TimeoutCompletion(time, unit, cont)
- // schedule cancellation of this coroutine on time
- completion.disposeOnCompletion(context.delay.invokeOnTimeout(time, unit, completion))
- completion.initParentJob(context[Job])
- // restart block using new coroutine with new job,
- // however start it as undispatched coroutine, because we are already in the proper context
- block.startCoroutineUninterceptedOrReturn(completion)
+ setupTimeout(TimeoutCoroutine(time, unit, cont), block)
}
}
-private open class TimeoutCompletion<U, in T: U>(
- private val time: Long,
- private val unit: TimeUnit,
- @JvmField protected val cont: Continuation<U>
-) : JobSupport(active = true), Runnable, Continuation<T> {
+private fun <U, T: U> setupTimeout(
+ coroutine: TimeoutCoroutine<U, T>,
+ block: suspend CoroutineScope.() -> T
+): Any? {
+ // schedule cancellation of this coroutine on time
+ val cont = coroutine.cont
+ val context = cont.context
+ coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine.unit, coroutine))
+ coroutine.initParentJob(context[Job])
+ // restart block using new coroutine with new job,
+ // however start it as undispatched coroutine, because we are already in the proper context
+ val result = try {
+ block.startCoroutineUninterceptedOrReturn(receiver = coroutine, completion = coroutine)
+ } catch (e: Throwable) {
+ CompletedExceptionally(e)
+ }
+ return when {
+ result == COROUTINE_SUSPENDED -> COROUTINE_SUSPENDED
+ coroutine.makeCompleting(result, MODE_IGNORE) -> {
+ if (result is CompletedExceptionally) throw result.exception else result
+ }
+ else -> COROUTINE_SUSPENDED
+ }
+}
+
+/**
+ * @suppress **Deprecated**: for binary compatibility only
+ */
+@Deprecated("for binary compatibility only", level=DeprecationLevel.HIDDEN)
+public suspend fun <T> withTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend () -> T): T =
+ withTimeout(time, unit) { block() }
+
+private open class TimeoutCoroutine<U, in T: U>(
+ @JvmField val time: Long,
+ @JvmField val unit: TimeUnit,
+ @JvmField val cont: Continuation<U>
+) : AbstractCoroutine<T>(cont.context, active = true), Runnable, Continuation<T> {
+ override val defaultResumeMode: Int get() = MODE_DIRECT
+
@Suppress("LeakingThis")
- override val context: CoroutineContext = cont.context + this // mix in this Job into the context
- override fun run() { cancel(TimeoutCancellationException(time, unit, this)) }
- override fun resume(value: T) { cont.resumeDirect(value) }
- override fun resumeWithException(exception: Throwable) { cont.resumeDirectWithException(exception) }
+ override fun run() {
+ cancel(TimeoutCancellationException(time, unit, this))
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ override fun afterCompletion(state: Any?, mode: Int) {
+ if (state is CompletedExceptionally)
+ cont.resumeWithExceptionMode(state.exception, mode)
+ else
+ cont.resumeMode(state as T, mode)
+ }
+
+ override fun nameString(): String =
+ "${super.nameString()}($time $unit)"
}
/**
@@ -74,9 +112,9 @@
* `null` if this timeout was exceeded.
*
* The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
- * cancellable suspending function inside the block throws [TimeoutCancellationException]. Normally that exception,
- * if uncaught by the block, gets converted into the `null` result of `withTimeoutOrNull`.
- * However, the code in the block can suppress [TimeoutCancellationException].
+ * cancellable suspending function inside the block throws [TimeoutCancellationException].
+ * Even if the code in the block suppresses [TimeoutCancellationException], this
+ * invocation of `withTimeoutOrNull` still returns `null`.
*
* The sibling function that throws exception on timeout is [withTimeout].
* Note, that timeout action can be specified for [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
@@ -87,36 +125,35 @@
* @param time timeout time
* @param unit timeout unit (milliseconds by default)
*/
-public suspend fun <T> withTimeoutOrNull(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend () -> T): T? {
+public suspend fun <T> withTimeoutOrNull(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend CoroutineScope.() -> T): T? {
require(time >= 0) { "Timeout time $time cannot be negative" }
if (time <= 0L) return null
return suspendCoroutineOrReturn { cont: Continuation<T?> ->
- val context = cont.context
- val completion = TimeoutOrNullCompletion(time, unit, cont)
- // schedule cancellation of this coroutine on time
- completion.disposeOnCompletion(context.delay.invokeOnTimeout(time, unit, completion))
- completion.initParentJob(context[Job])
- // restart block using new coroutine with new job,
- // however start it as undispatched coroutine, because we are already in the proper context
- try {
- block.startCoroutineUninterceptedOrReturn(completion)
- } catch (e: TimeoutCancellationException) {
- // replace inner timeout exception on our coroutine with null result
- if (e.coroutine == completion) null else throw e
- }
+ setupTimeout(TimeoutOrNullCoroutine(time, unit, cont), block)
}
}
-private class TimeoutOrNullCompletion<T>(
+/**
+ * @suppress **Deprecated**: for binary compatibility only
+ */
+@Deprecated("for binary compatibility only", level=DeprecationLevel.HIDDEN)
+public suspend fun <T> withTimeoutOrNull(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend () -> T): T? =
+ withTimeoutOrNull(time, unit) { block() }
+
+private class TimeoutOrNullCoroutine<T>(
time: Long,
unit: TimeUnit,
cont: Continuation<T?>
-) : TimeoutCompletion<T?, T>(time, unit, cont) {
- override fun resumeWithException(exception: Throwable) {
- // suppress inner timeout exception and replace it with null
- if (exception is TimeoutCancellationException && exception.coroutine === this)
- cont.resumeDirect(null) else
- cont.resumeDirectWithException(exception)
+) : TimeoutCoroutine<T?, T>(time, unit, cont) {
+ @Suppress("UNCHECKED_CAST")
+ override fun afterCompletion(state: Any?, mode: Int) {
+ if (state is CompletedExceptionally) {
+ val exception = state.exception
+ if (exception is TimeoutCancellationException && exception.coroutine === this)
+ cont.resumeMode(null, mode) else
+ cont.resumeWithExceptionMode(exception, mode)
+ } else
+ cont.resumeMode(state as T, mode)
}
}
@@ -140,6 +177,7 @@
@Deprecated("Renamed to TimeoutCancellationException", replaceWith = ReplaceWith("TimeoutCancellationException"))
public open class TimeoutException(message: String) : CancellationException(message)
+@Suppress("FunctionName")
private fun TimeoutCancellationException(
time: Long,
unit: TimeUnit,
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt
index bcf941e..c088b62 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt
@@ -18,7 +18,6 @@
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
-import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.experimental.CoroutineContext
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Yield.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Yield.kt
index aa2ce84..80705b3 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Yield.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Yield.kt
@@ -16,6 +16,7 @@
package kotlinx.coroutines.experimental
+import kotlin.coroutines.experimental.CoroutineContext
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
@@ -29,10 +30,14 @@
*/
suspend fun yield(): Unit = suspendCoroutineOrReturn sc@ { cont ->
val context = cont.context
- val job = context[Job]
- if (job != null && !job.isActive) throw job.getCompletionException()
+ context.checkCompletion()
if (cont !is DispatchedContinuation<Unit>) return@sc Unit
if (!cont.dispatcher.isDispatchNeeded(context)) return@sc Unit
cont.dispatchYield(Unit)
COROUTINE_SUSPENDED
}
+
+internal fun CoroutineContext.checkCompletion() {
+ val job = get(Job)
+ if (job != null && !job.isActive) throw job.getCancellationException()
+}
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
index 0f0c55f..e38da5c 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
@@ -309,16 +309,17 @@
private fun initCancellability() {
val parent = context[Job] ?: return
- val newRegistration = parent.invokeOnCompletion(SelectOnCancellation(parent), onCancelling = true)
+ val newRegistration = parent.invokeOnCompletion(onCancelling = true, handler = SelectOnCancellation(parent))
parentHandle = newRegistration
// now check our state _after_ registering
if (isSelected) newRegistration.dispose()
}
private inner class SelectOnCancellation(job: Job) : JobCancellationNode<Job>(job) {
- override fun invokeOnce(reason: Throwable?) {
+ // Note: may be invoked multiple times, but only the first trySelect succeeds anyway
+ override fun invoke(reason: Throwable?) {
if (trySelect(null))
- resumeSelectCancellableWithException(reason ?: CancellationException("Select was cancelled"))
+ resumeSelectCancellableWithException(job.getCancellationException())
}
override fun toString(): String = "SelectOnCancellation[${this@SelectBuilderImpl}]"
}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-01.kt b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-01.kt
index 4dd92a6..f9bfc75 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-01.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-01.kt
@@ -29,6 +29,6 @@
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancel() // cancels the job
- delay(1300L) // delay a bit to ensure it was cancelled indeed
+ job.join() // waits for job's completion
println("main: Now I can quit.")
}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-02.kt b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-02.kt
index 1e99b34..39f876d 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-02.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-02.kt
@@ -24,7 +24,7 @@
val job = launch(CommonPool) {
var nextPrintTime = startTime
var i = 0
- while (i < 10) { // computation loop, just wastes CPU
+ while (i < 5) { // computation loop, just wastes CPU
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("I'm sleeping ${i++} ...")
@@ -34,7 +34,6 @@
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
- job.cancel() // cancels the job
- delay(1300L) // delay a bit to see if it was cancelled....
+ job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-03.kt b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-03.kt
index 8746d38..d0c9955 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-03.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-03.kt
@@ -34,7 +34,6 @@
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
- job.cancel() // cancels the job
- delay(1300L) // delay a bit to see if it was cancelled....
+ job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-04.kt b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-04.kt
index 28df084..286ed85 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-04.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-04.kt
@@ -32,7 +32,6 @@
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
- job.cancel() // cancels the job
- delay(1300L) // delay a bit to ensure it was cancelled indeed
+ job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-05.kt b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-05.kt
index d379c08..2fdd026 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-05.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-05.kt
@@ -36,7 +36,6 @@
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
- job.cancel() // cancels the job
- delay(1300L) // delay a bit to ensure it was cancelled indeed
+ job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-07.kt b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-07.kt
new file mode 100644
index 0000000..1a197a4
--- /dev/null
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-07.kt
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
+package guide.cancel.example07
+
+import kotlinx.coroutines.experimental.*
+
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val result = withTimeoutOrNull(1300L) {
+ repeat(1000) { i ->
+ println("I'm sleeping $i ...")
+ delay(500L)
+ }
+ "Done" // will get cancelled before it produces this result
+ }
+ println("Result is $result")
+}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt
index 95a1cac..b14666e 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-05.kt
@@ -37,4 +37,5 @@
println(prime)
cur = filter(coroutineContext, cur, prime)
}
+ coroutineContext.cancelChildren() // cancel all children to let main finish
}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-07.kt b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-07.kt
index 11cbc22..d10c5ed 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-07.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-07.kt
@@ -34,4 +34,5 @@
repeat(6) { // receive first six
println(channel.receive())
}
+ coroutineContext.cancelChildren() // cancel all children to let main finish
}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-08.kt b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-08.kt
index 81d97f7..096f435 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-08.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-08.kt
@@ -22,7 +22,7 @@
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>(4) // create buffered channel
- launch(coroutineContext) { // launch sender coroutine
+ val sender = launch(coroutineContext) { // launch sender coroutine
repeat(10) {
println("Sending $it") // print before sending each element
channel.send(it) // will suspend when buffer is full
@@ -30,4 +30,5 @@
}
// don't receive anything... just wait....
delay(1000)
+ sender.cancel() // cancel sender coroutine
}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-09.kt b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-09.kt
index c9e744f..2a5e064 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-09.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-09.kt
@@ -28,7 +28,7 @@
launch(coroutineContext) { player("pong", table) }
table.send(Ball(0)) // serve the ball
delay(1000) // delay 1 second
- table.receive() // game over, grab the ball
+ coroutineContext.cancelChildren() // game over, cancel them
}
suspend fun player(name: String, table: Channel<Ball>) {
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-08.kt b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-08.kt
index 7cbfb21..0cdc167 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-08.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-08.kt
@@ -19,20 +19,17 @@
import kotlinx.coroutines.experimental.*
-fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
-
-fun main(args: Array<String>) = runBlocking(CoroutineName("main")) {
- log("Started main coroutine")
- // run two background value computations
- val v1 = async(CommonPool + CoroutineName("v1coroutine")) {
- log("Computing v1")
- delay(500)
- 252
+fun main(args: Array<String>) = runBlocking<Unit> {
+ // start a coroutine to process some kind of incoming request
+ val request = launch(CommonPool) {
+ repeat(3) { i -> // launch a few children jobs
+ launch(coroutineContext) {
+ delay((i + 1) * 200L) // variable delay 200ms, 400ms, 600ms
+ println("Coroutine $i is done")
+ }
+ }
+ println("request: I'm done and I don't explicitly join my children that are still active")
}
- val v2 = async(CommonPool + CoroutineName("v2coroutine")) {
- log("Computing v2")
- delay(1000)
- 6
- }
- log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
+ request.join() // wait for completion of the request, including all its children
+ println("Now processing of the request is complete")
}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-09.kt b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-09.kt
index 44ba5dc..80ed030 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-09.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-09.kt
@@ -19,19 +19,20 @@
import kotlinx.coroutines.experimental.*
-fun main(args: Array<String>) = runBlocking<Unit> {
- val job = Job() // create a job object to manage our lifecycle
- // now launch ten coroutines for a demo, each working for a different time
- val coroutines = List(10) { i ->
- // they are all children of our job object
- launch(coroutineContext + job) { // we use the context of main runBlocking thread, but with our own job object
- delay(i * 200L) // variable delay 0ms, 200ms, 400ms, ... etc
- println("Coroutine $i is done")
- }
+fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
+
+fun main(args: Array<String>) = runBlocking(CoroutineName("main")) {
+ log("Started main coroutine")
+ // run two background value computations
+ val v1 = async(CommonPool + CoroutineName("v1coroutine")) {
+ log("Computing v1")
+ delay(500)
+ 252
}
- println("Launched ${coroutines.size} coroutines")
- delay(500L) // delay for half a second
- println("Cancelling job!")
- job.cancel() // cancel our job.. !!!
- delay(1000L) // delay for more to see if our coroutines are still working
+ val v2 = async(CommonPool + CoroutineName("v2coroutine")) {
+ log("Computing v2")
+ delay(1000)
+ 6
+ }
+ log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-10.kt b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-10.kt
new file mode 100644
index 0000000..2222cf8
--- /dev/null
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-10.kt
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
+package guide.context.example10
+
+import kotlinx.coroutines.experimental.*
+
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val job = Job() // create a job object to manage our lifecycle
+ // now launch ten coroutines for a demo, each working for a different time
+ val coroutines = List(10) { i ->
+ // they are all children of our job object
+ launch(coroutineContext + job) { // we use the context of main runBlocking thread, but with our own job object
+ delay((i + 1) * 200L) // variable delay 200ms, 400ms, ... etc
+ println("Coroutine $i is done")
+ }
+ }
+ println("Launched ${coroutines.size} coroutines")
+ delay(500L) // delay for half a second
+ println("Cancelling the job!")
+ job.cancelAndJoin() // cancel all our coroutines and wait for all of them to complete
+}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-select-01.kt b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-select-01.kt
index 3c952d6..9e46a7c 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-select-01.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-select-01.kt
@@ -53,4 +53,5 @@
repeat(7) {
selectFizzBuzz(fizz, buzz)
}
+ coroutineContext.cancelChildren() // cancel fizz & buzz coroutines
}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-select-02.kt b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-select-02.kt
index b3ff0e8..b128877 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-select-02.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-select-02.kt
@@ -48,4 +48,5 @@
repeat(8) { // print first eight results
println(selectAorB(a, b))
}
+ coroutineContext.cancelChildren()
}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-select-03.kt b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-select-03.kt
index e754904..acff08f 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-select-03.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-select-03.kt
@@ -20,8 +20,9 @@
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.*
import kotlinx.coroutines.experimental.selects.*
+import kotlin.coroutines.experimental.CoroutineContext
-fun produceNumbers(side: SendChannel<Int>) = produce<Int>(CommonPool) {
+fun produceNumbers(context: CoroutineContext, side: SendChannel<Int>) = produce<Int>(context) {
for (num in 1..10) { // produce 10 numbers from 1 to 10
delay(100) // every 100 ms
select<Unit> {
@@ -36,9 +37,10 @@
launch(coroutineContext) { // this is a very fast consumer for the side channel
side.consumeEach { println("Side channel has $it") }
}
- produceNumbers(side).consumeEach {
+ produceNumbers(coroutineContext, side).consumeEach {
println("Consuming $it")
delay(250) // let us digest the consumed number properly, do not hurry
}
println("Done consuming")
+ coroutineContext.cancelChildren()
}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-06.kt b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-06.kt
index 279b51d..73fe116 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-06.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-sync-06.kt
@@ -21,6 +21,7 @@
import kotlin.coroutines.experimental.CoroutineContext
import kotlin.system.measureTimeMillis
import kotlinx.coroutines.experimental.sync.Mutex
+import kotlinx.coroutines.experimental.sync.withLock
suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
val n = 1000 // number of coroutines to launch
@@ -41,9 +42,9 @@
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
- mutex.lock()
- try { counter++ }
- finally { mutex.unlock() }
+ mutex.withLock {
+ counter++
+ }
}
println("Counter = $counter")
}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt
index ebcc5fc..e4beeb6 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt
@@ -73,7 +73,6 @@
"main: I'm tired of waiting!",
"I'm sleeping 3 ...",
"I'm sleeping 4 ...",
- "I'm sleeping 5 ...",
"main: Now I can quit."
)
}
@@ -125,6 +124,16 @@
}
@Test
+ fun testGuideCancelExample07() {
+ test("GuideCancelExample07") { guide.cancel.example07.main(emptyArray()) }.verifyLines(
+ "I'm sleeping 0 ...",
+ "I'm sleeping 1 ...",
+ "I'm sleeping 2 ...",
+ "Result is null"
+ )
+ }
+
+ @Test
fun testGuideComposeExample01() {
test("GuideComposeExample01") { guide.compose.example01.main(emptyArray()) }.verifyLinesArbitraryTime(
"The answer is 42",
@@ -197,7 +206,7 @@
@Test
fun testGuideContextExample05() {
test("GuideContextExample05") { guide.context.example05.main(emptyArray()) }.also { lines ->
- check(lines.size == 1 && lines[0].startsWith("My job is BlockingCoroutine{Active}@"))
+ check(lines.size == 1 && lines[0].startsWith("My job is \"coroutine#1\":BlockingCoroutine{Active}@"))
}
}
@@ -221,7 +230,18 @@
@Test
fun testGuideContextExample08() {
- test("GuideContextExample08") { guide.context.example08.main(emptyArray()) }.verifyLinesFlexibleThread(
+ test("GuideContextExample08") { guide.context.example08.main(emptyArray()) }.verifyLines(
+ "request: I'm done and I don't explicitly join my children that are still active",
+ "Coroutine 0 is done",
+ "Coroutine 1 is done",
+ "Coroutine 2 is done",
+ "Now processing of the request is complete"
+ )
+ }
+
+ @Test
+ fun testGuideContextExample09() {
+ test("GuideContextExample09") { guide.context.example09.main(emptyArray()) }.verifyLinesFlexibleThread(
"[main @main#1] Started main coroutine",
"[ForkJoinPool.commonPool-worker-1 @v1coroutine#2] Computing v1",
"[ForkJoinPool.commonPool-worker-2 @v2coroutine#3] Computing v2",
@@ -230,13 +250,12 @@
}
@Test
- fun testGuideContextExample09() {
- test("GuideContextExample09") { guide.context.example09.main(emptyArray()) }.verifyLines(
+ fun testGuideContextExample10() {
+ test("GuideContextExample10") { guide.context.example10.main(emptyArray()) }.verifyLines(
"Launched 10 coroutines",
"Coroutine 0 is done",
"Coroutine 1 is done",
- "Coroutine 2 is done",
- "Cancelling job!"
+ "Cancelling the job!"
)
}
@@ -340,8 +359,7 @@
"ping Ball(hits=1)",
"pong Ball(hits=2)",
"ping Ball(hits=3)",
- "pong Ball(hits=4)",
- "ping Ball(hits=5)"
+ "pong Ball(hits=4)"
)
}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/CompletableDeferredTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/CompletableDeferredTest.kt
index 27ad4e4..65354b1 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/CompletableDeferredTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/CompletableDeferredTest.kt
@@ -18,6 +18,7 @@
import org.hamcrest.core.IsEqual
import org.hamcrest.core.IsInstanceOf
+import org.hamcrest.core.IsNull
import org.junit.Assert.assertThat
import org.junit.Assert.fail
import org.junit.Test
@@ -30,8 +31,9 @@
assertThat(c.isCancelled, IsEqual(false))
assertThat(c.isCompleted, IsEqual(false))
assertThat(c.isCompletedExceptionally, IsEqual(false))
- assertThrows<IllegalStateException> { c.getCompletionException() }
+ assertThrows<IllegalStateException> { c.getCancellationException() }
assertThrows<IllegalStateException> { c.getCompleted() }
+ assertThrows<IllegalStateException> { c.getCompletionExceptionOrNull() }
}
@Test
@@ -48,8 +50,9 @@
assertThat(c.isCancelled, IsEqual(false))
assertThat(c.isCompleted, IsEqual(true))
assertThat(c.isCompletedExceptionally, IsEqual(false))
- assertThat(c.getCompletionException(), IsInstanceOf(CancellationException::class.java))
+ assertThat(c.getCancellationException(), IsInstanceOf(JobCancellationException::class.java))
assertThat(c.getCompleted(), IsEqual("OK"))
+ assertThat(c.getCompletionExceptionOrNull(), IsNull())
}
@Test
@@ -66,8 +69,9 @@
assertThat(c.isCancelled, IsEqual(false))
assertThat(c.isCompleted, IsEqual(true))
assertThat(c.isCompletedExceptionally, IsEqual(true))
- assertThat(c.getCompletionException(), IsInstanceOf(TestException::class.java))
+ assertThat(c.getCancellationException(), IsInstanceOf(JobCancellationException::class.java))
assertThrows<TestException> { c.getCompleted() }
+ assertThat(c.getCompletionExceptionOrNull(), IsInstanceOf(TestException::class.java))
}
@Test
@@ -84,8 +88,9 @@
assertThat(c.isCancelled, IsEqual(true))
assertThat(c.isCompleted, IsEqual(true))
assertThat(c.isCompletedExceptionally, IsEqual(true))
- assertThat(c.getCompletionException(), IsInstanceOf(CancellationException::class.java))
+ assertThat(c.getCancellationException(), IsInstanceOf(CancellationException::class.java))
assertThrows<CancellationException> { c.getCompleted() }
+ assertThat(c.getCompletionExceptionOrNull(), IsInstanceOf(CancellationException::class.java))
}
@Test
@@ -102,8 +107,9 @@
assertThat(c.isCancelled, IsEqual(true))
assertThat(c.isCompleted, IsEqual(true))
assertThat(c.isCompletedExceptionally, IsEqual(true))
- assertThat(c.getCompletionException(), IsInstanceOf(TestException::class.java))
+ assertThat(c.getCancellationException(), IsInstanceOf(JobCancellationException::class.java))
assertThrows<TestException> { c.getCompleted() }
+ assertThat(c.getCompletionExceptionOrNull(), IsInstanceOf(TestException::class.java))
}
@Test
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/CoroutinesTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/CoroutinesTest.kt
index e88ac46..3408eeb 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/CoroutinesTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/CoroutinesTest.kt
@@ -21,20 +21,20 @@
class CoroutinesTest : TestBase() {
@Test
- fun testSimple() = runBlocking {
+ fun testSimple() = runTest {
expect(1)
finish(2)
}
@Test
- fun testYield() = runBlocking {
+ fun testYield() = runTest {
expect(1)
yield() // effectively does nothing, as we don't have other coroutines
finish(2)
}
@Test
- fun testLaunchAndYieldJoin() = runBlocking {
+ fun testLaunchAndYieldJoin() = runTest {
expect(1)
val job = launch(coroutineContext) {
expect(3)
@@ -49,7 +49,7 @@
}
@Test
- fun testLaunchUndispatched() = runBlocking {
+ fun testLaunchUndispatched() = runTest {
expect(1)
val job = launch(coroutineContext, start = CoroutineStart.UNDISPATCHED) {
expect(2)
@@ -64,7 +64,7 @@
}
@Test
- fun testNested() = runBlocking {
+ fun testNested() = runTest {
expect(1)
val j1 = launch(coroutineContext) {
expect(3)
@@ -81,20 +81,21 @@
}
@Test
- fun testCancelChildImplicit() = runBlocking {
+ fun testWaitChild() = runTest {
expect(1)
launch(coroutineContext) {
expect(3)
- yield() // parent finishes earlier, does not wait for us
- expectUnreached()
+ yield() // to parent
+ finish(5)
}
expect(2)
yield()
- finish(4)
+ expect(4)
+ // parent waits for child's completion
}
@Test
- fun testCancelChildExplicit() = runBlocking {
+ fun testCancelChildExplicit() = runTest {
expect(1)
val job = launch(coroutineContext) {
expect(3)
@@ -109,7 +110,7 @@
}
@Test
- fun testCancelChildWithFinally() = runBlocking {
+ fun testCancelChildWithFinally() = runTest {
expect(1)
val job = launch(coroutineContext) {
expect(3)
@@ -128,36 +129,42 @@
}
@Test
- fun testCancelNestedImplicit() = runBlocking {
+ fun testWaitNestedChild() = runTest {
expect(1)
launch(coroutineContext) {
expect(3)
launch(coroutineContext) {
expect(6)
- yield() // parent finishes earlier, does not wait for us
- expectUnreached()
+ yield() // to parent
+ expect(9)
}
expect(4)
yield()
expect(7)
- yield() // does not go further, because already cancelled
- expectUnreached()
+ yield() // to parent
+ finish(10) // the last one to complete
}
expect(2)
yield()
expect(5)
yield()
- finish(8)
+ expect(8)
+ // parent waits for child
}
- @Test(expected = IOException::class)
- fun testExceptionPropagation(): Unit = runBlocking {
+ @Test
+ fun testExceptionPropagation() = runTest(
+ expected = { it is IOException }
+ ) {
finish(1)
throw IOException()
}
- @Test(expected = IOException::class)
- fun testCancelParentOnChildException(): Unit = runBlocking {
+ @Test
+ fun testCancelParentOnChildException() = runTest(
+ expected = { it is IOException },
+ unhandled = listOf({ it -> it is IOException })
+ ) {
expect(1)
launch(coroutineContext) {
finish(3)
@@ -168,8 +175,14 @@
expectUnreached() // because of exception in child
}
- @Test(expected = IOException::class)
- fun testCancelParentOnNestedException(): Unit = runBlocking {
+ @Test
+ fun testCancelParentOnNestedException() = runTest(
+ expected = { it is IOException },
+ unhandled = listOf(
+ { it -> it is IOException },
+ { it -> it is IOException }
+ )
+ ) {
expect(1)
launch(coroutineContext) {
expect(3)
@@ -189,7 +202,7 @@
}
@Test
- fun testJoinWithFinally() = runBlocking {
+ fun testJoinWithFinally() = runTest {
expect(1)
val job = launch(coroutineContext) {
expect(3)
@@ -215,7 +228,7 @@
}
@Test
- fun testCancelAndJoin() = runBlocking {
+ fun testCancelAndJoin() = runTest {
expect(1)
val job = launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
try {
@@ -230,4 +243,27 @@
job.cancelAndJoin()
finish(5)
}
+
+ @Test
+ fun testCancelAndJoinChildCrash() = runTest(
+ expected = { it is IOException && it.message == "OK" },
+ unhandled = listOf({it -> it is IOException })
+ ) {
+ expect(1)
+ val job = launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+ expect(2)
+ throw IOException("OK")
+ }
+ // now we have a failed job with IOException
+ finish(3)
+ try {
+ job.cancelAndJoin() // join should crash on child's exception but it will be wrapped into JobCancellationException
+ } catch (e: Throwable) {
+ e as JobCancellationException // type assertion
+ check(e.cause is IOException)
+ check(e.job === coroutineContext[Job])
+ throw e
+ }
+ expectUnreached()
+ }
}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/JobDisposeTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/JobDisposeTest.kt
index bbb5bda..2cc1399 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/JobDisposeTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/JobDisposeTest.kt
@@ -51,7 +51,7 @@
threads += testThread("creator") {
while (!done) {
val job = TestJob()
- val handle = job.invokeOnCompletion({ /* nothing */ }, onCancelling = true)
+ val handle = job.invokeOnCompletion(onCancelling = true) { /* nothing */ }
this.job = job // post job to cancelling thread
this.handle = handle // post handle to concurrent disposer thread
handle.dispose() // dispose of handle from this thread (concurrently with other disposer)
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/JobTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/JobTest.kt
index e8596d7..248c3b8 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/JobTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/JobTest.kt
@@ -117,7 +117,8 @@
val tryCancel = Try<Unit> { job.cancel() }
check(!job.isActive)
for (i in 0 until n) assertEquals(1, fireCount[i])
- check(tryCancel.exception is TestException)
+ check(tryCancel.exception is CompletionHandlerException)
+ check(tryCancel.exception!!.cause is TestException)
}
@Test
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/RunTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/RunTest.kt
index 0854209..87c36ec 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/RunTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/RunTest.kt
@@ -24,10 +24,10 @@
class RunTest : TestBase() {
@Test
- fun testSameContextNoSuspend() = runBlocking<Unit> {
+ fun testSameContextNoSuspend() = runTest {
expect(1)
launch(coroutineContext) { // make sure there is not early dispatch here
- expectUnreached() // will terminate before it has a chance to start
+ finish(5) // after main exits
}
expect(2)
val result = run(coroutineContext) { // same context!
@@ -35,11 +35,12 @@
"OK"
}
assertThat(result, IsEqual("OK"))
- finish(4)
+ expect(4)
+ // will wait for the first coroutine
}
@Test
- fun testSameContextWithSuspend() = runBlocking<Unit> {
+ fun testSameContextWithSuspend() = runTest {
expect(1)
launch(coroutineContext) { // make sure there is not early dispatch here
expect(4)
@@ -56,10 +57,10 @@
}
@Test
- fun testCancelWithJobNoSuspend() = runBlocking<Unit> {
+ fun testCancelWithJobNoSuspend() = runTest {
expect(1)
launch(coroutineContext) { // make sure there is not early dispatch to here
- expectUnreached() // will terminate before it has a chance to start
+ finish(6) // after main exits
}
expect(2)
val job = Job()
@@ -75,11 +76,12 @@
"OK"
}
assertThat(result, IsEqual("OK"))
- finish(5)
+ expect(5)
+ // will wait for the first coroutine
}
@Test
- fun testCancelWithJobWithSuspend() = runBlocking<Unit> {
+ fun testCancelWithJobWithSuspend() = runTest {
expect(1)
launch(coroutineContext) { // make sure there is not early dispatch to here
expect(4)
@@ -104,7 +106,7 @@
}
@Test
- fun testCommonPoolNoSuspend() = runBlocking<Unit> {
+ fun testCommonPoolNoSuspend() = runTest {
expect(1)
val result = run(CommonPool) {
expect(2)
@@ -115,7 +117,7 @@
}
@Test
- fun testCommonPoolWithSuspend() = runBlocking<Unit> {
+ fun testCommonPoolWithSuspend() = runTest {
expect(1)
val result = run(CommonPool) {
expect(2)
@@ -127,8 +129,10 @@
finish(4)
}
- @Test(expected = CancellationException::class)
- fun testRunCancellableDefault() = runBlocking<Unit> {
+ @Test
+ fun testRunCancellableDefault() = runTest(
+ expected = { it is JobCancellationException }
+ ) {
val job = Job()
job.cancel() // cancel before it has a chance to run
run(job + wrapperDispatcher(coroutineContext)) {
@@ -136,8 +140,10 @@
}
}
- @Test(expected = CancellationException::class)
- fun testRunAtomicTryCancel() = runBlocking<Unit> {
+ @Test
+ fun testRunAtomicTryCancel() = runTest(
+ expected = { it is JobCancellationException }
+ ) {
expect(1)
val job = Job()
job.cancel() // try to cancel before it has a chance to run
@@ -148,8 +154,10 @@
}
}
- @Test(expected = CancellationException::class)
- fun testRunUndispatchedTryCancel() = runBlocking<Unit> {
+ @Test
+ fun testRunUndispatchedTryCancel() = runTest(
+ expected = { it is JobCancellationException }
+ ) {
expect(1)
val job = Job()
job.cancel() // try to cancel before it has a chance to run
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/TestBase.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/TestBase.kt
index f4cd14a..8327d37 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/TestBase.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/TestBase.kt
@@ -62,8 +62,8 @@
* Throws [IllegalStateException] like `error` in stdlib, but also ensures that the test will not
* complete successfully even if this exception is consumed somewhere in the test.
*/
- public fun error(message: Any): Nothing {
- val exception = IllegalStateException(message.toString())
+ public fun error(message: Any, cause: Throwable? = null): Nothing {
+ val exception = IllegalStateException(message.toString(), cause)
error.compareAndSet(null, exception)
throw exception
}
@@ -116,4 +116,35 @@
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT)
checkTestThreads(threadsBefore)
}
+
+ fun runTest(
+ expected: ((Throwable) -> Boolean)? = null,
+ unhandled: List<(Throwable) -> Boolean> = emptyList(),
+ block: suspend CoroutineScope.() -> Unit
+ ) {
+ var exCount = 0
+ var ex: Throwable? = null
+ try {
+ runBlocking(block = block, context = CoroutineExceptionHandler { context, e ->
+ if (e is CancellationException) return@CoroutineExceptionHandler // are ignored
+ exCount++
+ if (exCount > unhandled.size)
+ error("Too many unhandled exceptions $exCount, expected ${unhandled.size}, got: $e", e)
+ if (!unhandled[exCount - 1](e))
+ error("Unhandled exception was unexpected: $e", e)
+ context[Job]?.cancel(e)
+ })
+ } catch (e: Throwable) {
+ ex = e
+ if (expected != null) {
+ if (!expected(e))
+ error("Unexpected exception: $e", e)
+ } else
+ throw e
+ } finally {
+ if (ex == null && expected != null) error("Exception was expected but none produced")
+ }
+ if (unhandled != null && exCount < unhandled.size)
+ error("Too few unhandled exceptions $exCount, expected ${unhandled.size}")
+ }
}
\ No newline at end of file
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/WithTimeoutOrNullTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/WithTimeoutOrNullTest.kt
index 4b0436f..540e0b6 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/WithTimeoutOrNullTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/WithTimeoutOrNullTest.kt
@@ -24,10 +24,40 @@
class WithTimeoutOrNullTest : TestBase() {
/**
+ * Tests a case of no timeout and no suspension inside.
+ */
+ @Test
+ fun testBasicNoSuspend() = runTest {
+ expect(1)
+ val result = withTimeoutOrNull(10_000) {
+ expect(2)
+ "OK"
+ }
+ assertThat(result, IsEqual("OK"))
+ finish(3)
+ }
+
+ /**
+ * Tests a case of no timeout and one suspension inside.
+ */
+ @Test
+ fun testBasicSuspend() = runTest {
+ expect(1)
+ val result = withTimeoutOrNull(10_000) {
+ expect(2)
+ yield()
+ expect(3)
+ "OK"
+ }
+ assertThat(result, IsEqual("OK"))
+ finish(4)
+ }
+
+ /**
* Tests property dispatching of `withTimeoutOrNull` blocks
*/
@Test
- fun testDispatch() = runBlocking {
+ fun testDispatch() = runTest {
expect(1)
launch(coroutineContext) {
expect(4)
@@ -49,7 +79,7 @@
}
@Test
- fun testNullOnTimeout() = runBlocking {
+ fun testNullOnTimeout() = runTest {
expect(1)
val result = withTimeoutOrNull(100) {
expect(2)
@@ -62,7 +92,7 @@
}
@Test
- fun testSuppressException() = runBlocking {
+ fun testSuppressException() = runTest {
expect(1)
val result = withTimeoutOrNull(100) {
expect(2)
@@ -73,24 +103,28 @@
}
"OK"
}
- assertThat(result, IsEqual("OK"))
+ assertThat(result, IsNull())
finish(4)
}
- @Test(expected = IOException::class)
- fun testReplaceException() = runBlocking {
+ @Test
+ fun testReplaceException() = runTest(
+ unhandled = listOf({ it -> it is UnexpectedCoroutineException && it.cause is IOException })
+ ) {
expect(1)
- withTimeoutOrNull(100) {
+ val result = withTimeoutOrNull(100) {
expect(2)
try {
delay(1000)
} catch (e: CancellationException) {
- finish(3)
+ expect(3)
throw IOException(e)
}
+ expectUnreached()
"OK"
}
- expectUnreached()
+ assertThat(result, IsNull())
+ finish(4)
}
/**
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/WithTimeoutTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/WithTimeoutTest.kt
index f0f6ebd..b4bfdff 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/WithTimeoutTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/WithTimeoutTest.kt
@@ -23,10 +23,40 @@
class WithTimeoutTest : TestBase() {
/**
+ * Tests a case of no timeout and no suspension inside.
+ */
+ @Test
+ fun testBasicNoSuspend() = runTest {
+ expect(1)
+ val result = withTimeout(10_000) {
+ expect(2)
+ "OK"
+ }
+ assertThat(result, IsEqual("OK"))
+ finish(3)
+ }
+
+ /**
+ * Tests a case of no timeout and one suspension inside.
+ */
+ @Test
+ fun testBasicSuspend() = runTest {
+ expect(1)
+ val result = withTimeout(10_000) {
+ expect(2)
+ yield()
+ expect(3)
+ "OK"
+ }
+ assertThat(result, IsEqual("OK"))
+ finish(4)
+ }
+
+ /**
* Tests proper dispatching of `withTimeout` blocks
*/
@Test
- fun testDispatch() = runBlocking {
+ fun testDispatch() = runTest {
expect(1)
launch(coroutineContext) {
expect(4)
@@ -49,7 +79,7 @@
@Test
- fun testExceptionOnTimeout() = runBlocking<Unit> {
+ fun testExceptionOnTimeout() = runTest {
expect(1)
try {
withTimeout(100) {
@@ -65,23 +95,27 @@
}
@Test
- fun testSuppressException() = runBlocking {
+ fun testSuppressException() = runTest(
+ expected = { it is CancellationException }
+ ) {
expect(1)
val result = withTimeout(100) {
expect(2)
try {
delay(1000)
} catch (e: CancellationException) {
- expect(3)
+ finish(3)
}
"OK"
}
- assertThat(result, IsEqual("OK"))
- finish(4)
+ expectUnreached()
}
- @Test(expected = IOException::class)
- fun testReplaceException() = runBlocking {
+ @Test
+ fun testReplaceException() = runTest(
+ expected = { it is CancellationException },
+ unhandled = listOf({ it -> it is UnexpectedCoroutineException && it.cause is IOException })
+ ) {
expect(1)
withTimeout(100) {
expect(2)
@@ -91,6 +125,7 @@
finish(3)
throw IOException(e)
}
+ expectUnreached()
"OK"
}
expectUnreached()
@@ -100,11 +135,29 @@
* Tests that a 100% CPU-consuming loop will react on timeout if it has yields.
*/
@Test(expected = CancellationException::class)
- fun testYieldBlockingWithTimeout() = runBlocking {
+ fun testYieldBlockingWithTimeout() = runTest {
withTimeout(100) {
while (true) {
yield()
}
}
}
+
+ /**
+ * Tests that [withTimeout] waits for children coroutines to complete.
+ */
+ @Test
+ fun testWithTimeoutChildWait() = runTest {
+ expect(1)
+ withTimeout(100) {
+ expect(2)
+ // launch child with timeout
+ launch(coroutineContext) {
+ expect(4)
+ }
+ expect(3)
+ // now will wait for child before returning
+ }
+ finish(5)
+ }
}
\ No newline at end of file
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt
new file mode 100644
index 0000000..f8eab64
--- /dev/null
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.TestBase
+import kotlinx.coroutines.experimental.runBlocking
+import org.junit.Test
+
+class ProduceTest : TestBase() {
+ @Test
+ fun testBasic() = runBlocking {
+ val c = produce(coroutineContext) {
+ send(1)
+ send(2)
+ }
+ check(c.receive() == 1)
+ check(c.receive() == 2)
+ check(c.receiveOrNull() == null)
+ }
+
+ @Test
+ fun testCancel() = runBlocking {
+ val c = produce(coroutineContext) {
+ send(1)
+ send(2)
+ expectUnreached()
+ }
+ check(c.receive() == 1)
+ c.cancel()
+ check(c.receiveOrNull() == null)
+ }
+}
\ No newline at end of file
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectDeferredTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectDeferredTest.kt
index 99615b2..cbf1e6c 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectDeferredTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectDeferredTest.kt
@@ -128,4 +128,23 @@
finish(9)
}
+ @Test
+ fun testSelectCancel() = runTest(
+ expected = { it is JobCancellationException }
+ ) {
+ expect(1)
+ val d = CompletableDeferred<String>()
+ launch (coroutineContext) {
+ finish(3)
+ d.cancel() // will cancel after select starts
+ }
+ expect(2)
+ select<Unit> {
+ d.onAwait {
+ expectUnreached() // will not select
+ }
+ }
+ expectUnreached()
+ }
+
}
\ No newline at end of file
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectJobTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectJobTest.kt
index 02f8f4a..9d38aaf 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectJobTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectJobTest.kt
@@ -22,10 +22,10 @@
class SelectJobTest : TestBase() {
@Test
- fun testSelectCompleted() = runBlocking<Unit> {
+ fun testSelectCompleted() = runTest {
expect(1)
launch(coroutineContext) { // makes sure we don't yield to it earlier
- expectUnreached() // will terminate before it has a chance to start
+ finish(4) // after main exits
}
val job = Job()
job.cancel()
@@ -34,11 +34,12 @@
expect(2)
}
}
- finish(3)
+ expect(3)
+ // will wait for the first coroutine
}
@Test
- fun testSelectIncomplete() = runBlocking<Unit> {
+ fun testSelectIncomplete() = runTest {
expect(1)
val job = Job()
launch(coroutineContext) { // makes sure we don't yield to it earlier
@@ -62,7 +63,7 @@
}
@Test
- fun testSelectLazy() = runBlocking<Unit> {
+ fun testSelectLazy() = runTest {
expect(1)
val job = launch(coroutineContext, CoroutineStart.LAZY) {
expect(2)
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectMutexTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectMutexTest.kt
index 0c0fb5a..4f60faf 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectMutexTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectMutexTest.kt
@@ -18,7 +18,6 @@
import kotlinx.coroutines.experimental.TestBase
import kotlinx.coroutines.experimental.launch
-import kotlinx.coroutines.experimental.runBlocking
import kotlinx.coroutines.experimental.sync.Mutex
import kotlinx.coroutines.experimental.sync.MutexImpl
import kotlinx.coroutines.experimental.yield
@@ -28,11 +27,11 @@
class SelectMutexTest : TestBase() {
@Test
- fun testSelectLock() = runBlocking<Unit> {
+ fun testSelectLock() = runTest {
val mutex = Mutex()
expect(1)
launch(coroutineContext) { // ensure that it is not scheduled earlier than needed
- expectUnreached() // will terminate before it has a chance to start
+ finish(4) // after main exits
}
val res = select<String> {
mutex.onLock {
@@ -42,11 +41,12 @@
}
}
assertEquals("OK", res)
- finish(3)
+ expect(3)
+ // will wait for the first coroutine
}
@Test
- fun testSelectLockWait() = runBlocking<Unit> {
+ fun testSelectLockWait() = runTest {
val mutex = Mutex(true) // locked
expect(1)
launch(coroutineContext) {
@@ -71,7 +71,7 @@
}
@Test
- fun testSelectCancelledResourceRelease() = runBlocking<Unit> {
+ fun testSelectCancelledResourceRelease() = runTest {
val n = 1_000 * stressTestMultiplier
val mutex = Mutex(true) as MutexImpl // locked
expect(1)
diff --git a/coroutines-guide.md b/coroutines-guide.md
index 58134de..308fd84 100644
--- a/coroutines-guide.md
+++ b/coroutines-guide.md
@@ -76,6 +76,7 @@
* [Job in the context](#job-in-the-context)
* [Children of a coroutine](#children-of-a-coroutine)
* [Combining contexts](#combining-contexts)
+ * [Parental responsibilities](#parental-responsibilities)
* [Naming coroutines for debugging](#naming-coroutines-for-debugging)
* [Cancellation via explicit job](#cancellation-via-explicit-job)
* [Channels](#channels)
@@ -322,7 +323,7 @@
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancel() // cancels the job
- delay(1300L) // delay a bit to ensure it was cancelled indeed
+ job.join() // waits for job's completion
println("main: Now I can quit.")
}
```
@@ -342,6 +343,7 @@
<!--- TEST -->
As soon as main invokes `job.cancel`, we don't see any output from the other coroutine because it was cancelled.
+There is also an extension function [cancelAndJoin] that combines [cancel] and [join] invocations.
### Cancellation is cooperative
@@ -357,7 +359,7 @@
val job = launch(CommonPool) {
var nextPrintTime = startTime
var i = 0
- while (i < 10) { // computation loop, just wastes CPU
+ while (i < 5) { // computation loop, just wastes CPU
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("I'm sleeping ${i++} ...")
@@ -367,15 +369,15 @@
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
- job.cancel() // cancels the job
- delay(1300L) // delay a bit to see if it was cancelled....
+ job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
```
> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-02.kt)
-Run it to see that it continues to print "I'm sleeping" even after cancellation.
+Run it to see that it continues to print "I'm sleeping" even after cancellation
+until the job completes by itself after five iterations.
<!--- TEST
I'm sleeping 0 ...
@@ -384,7 +386,6 @@
main: I'm tired of waiting!
I'm sleeping 3 ...
I'm sleeping 4 ...
-I'm sleeping 5 ...
main: Now I can quit.
-->
@@ -394,7 +395,7 @@
invoke a suspending function. There is a [yield] function that is a good choice for that purpose.
The other one is to explicitly check the cancellation status. Let us try the later approach.
-Replace `while (i < 10)` in the previous example with `while (isActive)` and rerun it.
+Replace `while (i < 5)` in the previous example with `while (isActive)` and rerun it.
```kotlin
fun main(args: Array<String>) = runBlocking<Unit> {
@@ -412,15 +413,14 @@
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
- job.cancel() // cancels the job
- delay(1300L) // delay a bit to see if it was cancelled....
+ job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
```
> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-03.kt)
-As you can see, now this loop can be cancelled. [isActive][CoroutineScope.isActive] is a property that is available inside
+As you can see, now this loop is cancelled. [isActive][CoroutineScope.isActive] is a property that is available inside
the code of coroutines via [CoroutineScope] object.
<!--- TEST
@@ -451,15 +451,15 @@
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
- job.cancel() // cancels the job
- delay(1300L) // delay a bit to ensure it was cancelled indeed
+ job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
```
> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-04.kt)
-The example above produces the following output:
+Both [join] and [cancelAndJoin] wait for all the finalization actions to complete, so the example above
+produces the following output:
```text
I'm sleeping 0 ...
@@ -499,8 +499,7 @@
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
- job.cancel() // cancels the job
- delay(1300L) // delay a bit to ensure it was cancelled indeed
+ job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
```
@@ -557,7 +556,33 @@
Because cancellation is just an exception, all the resources will be closed in a usual way.
You can wrap the code with timeout in `try {...} catch (e: TimeoutCancellationException) {...}` block if
you need to do some additional action specifically on any kind of timeout or use [withTimeoutOrNull] function
-that is similar to [withTimeout], but returns `null` on timeout instead of throwing an exception.
+that is similar to [withTimeout], but returns `null` on timeout instead of throwing an exception:
+
+```kotlin
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val result = withTimeoutOrNull(1300L) {
+ repeat(1000) { i ->
+ println("I'm sleeping $i ...")
+ delay(500L)
+ }
+ "Done" // will get cancelled before it produces this result
+ }
+ println("Result is $result")
+}
+```
+
+> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-cancel-07.kt)
+
+There is no longer an exception when running this code:
+
+```text
+I'm sleeping 0 ...
+I'm sleeping 1 ...
+I'm sleeping 2 ...
+Result is null
+```
+
+<!--- TEST -->
## Composing suspending functions
@@ -928,10 +953,10 @@
It produces something like
```
-My job is BlockingCoroutine{Active}@65ae6ba4
+My job is "coroutine#1":BlockingCoroutine{Active}@6d311334
```
-<!--- TEST lines.size == 1 && lines[0].startsWith("My job is BlockingCoroutine{Active}@") -->
+<!--- TEST lines.size == 1 && lines[0].startsWith("My job is \"coroutine#1\":BlockingCoroutine{Active}@") -->
So, [isActive][CoroutineScope.isActive] in [CoroutineScope] is just a convenient shortcut for
`coroutineContext[Job]!!.isActive`.
@@ -1019,6 +1044,42 @@
<!--- TEST -->
+### Parental responsibilities
+
+A parent coroutine always waits for completion of all its children. Parent does not have to explicitly track
+all the children it launches and it does not have to use [join] to wait for them at the end:
+
+```kotlin
+fun main(args: Array<String>) = runBlocking<Unit> {
+ // start a coroutine to process some kind of incoming request
+ val request = launch(CommonPool) {
+ repeat(3) { i -> // launch a few children jobs
+ launch(coroutineContext) {
+ delay((i + 1) * 200L) // variable delay 200ms, 400ms, 600ms
+ println("Coroutine $i is done")
+ }
+ }
+ println("request: I'm done and I don't explicitly join my children that are still active")
+ }
+ request.join() // wait for completion of the request, including all its children
+ println("Now processing of the request is complete")
+}
+```
+
+> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-08.kt)
+
+The result is going to be:
+
+```text
+request: I'm done and I don't explicitly join my children that are still active
+Coroutine 0 is done
+Coroutine 1 is done
+Coroutine 2 is done
+Now processing of the request is complete
+```
+
+<!--- TEST -->
+
### Naming coroutines for debugging
Automatically assigned ids are good when coroutines log often and you just need to correlate log records
@@ -1049,7 +1110,7 @@
}
```
-> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-08.kt)
+> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-09.kt)
The output it produces with `-Dkotlinx.coroutines.debug` JVM option is similar to:
@@ -1074,6 +1135,8 @@
the lifecycle of our activity. A job instance is created using [`Job()`][Job] factory function
as the following example shows. We need to make sure that all the coroutines are started
with this job in their context and then a single invocation of [Job.cancel] terminates them all.
+Moreover, [Job.join] waits for all of them to complete, so we can also use [cancelAndJoin] here in
+this example:
```kotlin
fun main(args: Array<String>) = runBlocking<Unit> {
@@ -1082,19 +1145,18 @@
val coroutines = List(10) { i ->
// they are all children of our job object
launch(coroutineContext + job) { // we use the context of main runBlocking thread, but with our own job object
- delay(i * 200L) // variable delay 0ms, 200ms, 400ms, ... etc
+ delay((i + 1) * 200L) // variable delay 200ms, 400ms, ... etc
println("Coroutine $i is done")
}
}
println("Launched ${coroutines.size} coroutines")
delay(500L) // delay for half a second
- println("Cancelling job!")
- job.cancel() // cancel our job.. !!!
- delay(1000L) // delay for more to see if our coroutines are still working
+ println("Cancelling the job!")
+ job.cancelAndJoin() // cancel all our coroutines and wait for all of them to complete
}
```
-> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-09.kt)
+> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-10.kt)
The output of this example is:
@@ -1102,16 +1164,17 @@
Launched 10 coroutines
Coroutine 0 is done
Coroutine 1 is done
-Coroutine 2 is done
-Cancelling job!
+Cancelling the job!
```
<!--- TEST -->
As you can see, only the first three coroutines had printed a message and the others were cancelled
-by a single invocation of `job.cancel()`. So all we need to do in our hypothetical Android
+by a single invocation of `job.cancelAndJoin()`. So all we need to do in our hypothetical Android
application is to create a parent job object when activity is created, use it for child coroutines,
-and cancel it when activity is destroyed.
+and cancel it when activity is destroyed. We cannot `join` them in the case of Android lifecycle,
+since it is synchronous, but this joining ability is useful when building backend services to ensure bounded
+resource usage.
## Channels
@@ -1198,7 +1261,7 @@
to common sense that results must be returned from functions.
There is a convenience coroutine builder named [produce] that makes it easy to do it right on producer side,
-and an extension function [consumeEach], that can replace a `for` loop on the consumer side:
+and an extension function [consumeEach], that replaces a `for` loop on the consumer side:
```kotlin
fun produceSquares() = produce<Int>(CommonPool) {
@@ -1271,7 +1334,7 @@
[coroutines are like daemon threads](#coroutines-are-like-daemon-threads),
but in a larger app we'll need to stop our pipeline if we don't need it anymore.
Alternatively, we could have run pipeline coroutines as
-[children of a coroutine](#children-of-a-coroutine).
+[children of a coroutine](#children-of-a-coroutine) as is demonstrated in the following example.
### Prime numbers with pipeline
@@ -1307,7 +1370,10 @@
```
The following example prints the first ten prime numbers,
-running the whole pipeline in the context of the main thread:
+running the whole pipeline in the context of the main thread. Since all the coroutines are launched as
+children of the main [runBlocking] coroutine in its [coroutineContext][CoroutineScope.coroutineContext],
+we don't have to keep an explicit list of all the coroutine we have created.
+We use [CoroutineContext.cancelChildren] extension to cancel all the children coroutines.
```kotlin
fun main(args: Array<String>) = runBlocking<Unit> {
@@ -1317,6 +1383,7 @@
println(prime)
cur = filter(coroutineContext, cur, prime)
}
+ coroutineContext.cancelChildren() // cancel all children to let main finish
}
```
@@ -1427,7 +1494,7 @@
```
Now, let us see what happens if we launch a couple of coroutines sending strings
-(in this example we launch them in the context of the main thread):
+(in this example we launch them in the context of the main thread as main coroutine's children):
```kotlin
fun main(args: Array<String>) = runBlocking<Unit> {
@@ -1437,6 +1504,7 @@
repeat(6) { // receive first six
println(channel.receive())
}
+ coroutineContext.cancelChildren() // cancel all children to let main finish
}
```
@@ -1470,7 +1538,7 @@
```kotlin
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>(4) // create buffered channel
- launch(coroutineContext) { // launch sender coroutine
+ val sender = launch(coroutineContext) { // launch sender coroutine
repeat(10) {
println("Sending $it") // print before sending each element
channel.send(it) // will suspend when buffer is full
@@ -1478,6 +1546,7 @@
}
// don't receive anything... just wait....
delay(1000)
+ sender.cancel() // cancel sender coroutine
}
```
@@ -1497,7 +1566,6 @@
The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
-
### Channels are fair
Send and receive operations to channels are _fair_ with respect to the order of their invocation from
@@ -1514,7 +1582,7 @@
launch(coroutineContext) { player("pong", table) }
table.send(Ball(0)) // serve the ball
delay(1000) // delay 1 second
- table.receive() // game over, grab the ball
+ coroutineContext.cancelChildren() // game over, cancel them
}
suspend fun player(name: String, table: Channel<Ball>) {
@@ -1538,11 +1606,13 @@
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)
-ping Ball(hits=5)
```
<!--- TEST -->
+Note, that sometimes channels may produce executions that look unfair due to the nature of the executor
+that is being used. See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/111) for details.
+
## Shared mutable state and concurrency
Coroutines can be executed concurrently using a multi-threaded dispatcher like [CommonPool]. It presents
@@ -1566,6 +1636,7 @@
<!--- INCLUDE .*/example-sync-06.kt
import kotlinx.coroutines.experimental.sync.Mutex
+import kotlinx.coroutines.experimental.sync.withLock
-->
<!--- INCLUDE .*/example-sync-07.kt
@@ -1758,15 +1829,18 @@
Coroutine's alternative is called [Mutex]. It has [lock][Mutex.lock] and [unlock][Mutex.unlock] functions to
delimit a critical section. The key difference is that `Mutex.lock` is a suspending function. It does not block a thread.
+There is also [Mutex.withLock] extension function that conveniently represents
+`mutex.lock(); try { ... } finally { mutex.unlock() }` pattern:
+
```kotlin
val mutex = Mutex()
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
- mutex.lock()
- try { counter++ }
- finally { mutex.unlock() }
+ mutex.withLock {
+ counter++
+ }
}
println("Counter = $counter")
}
@@ -1920,6 +1994,7 @@
repeat(7) {
selectFizzBuzz(fizz, buzz)
}
+ coroutineContext.cancelChildren() // cancel fizz & buzz coroutines
}
```
@@ -1979,6 +2054,7 @@
repeat(8) { // print first eight results
println(selectAorB(a, b))
}
+ coroutineContext.cancelChildren()
}
```
@@ -2017,8 +2093,12 @@
Let us write an example of producer of integers that sends its values to a `side` channel when
the consumers on its primary channel cannot keep up with it:
+<!--- INCLUDE
+import kotlin.coroutines.experimental.CoroutineContext
+-->
+
```kotlin
-fun produceNumbers(side: SendChannel<Int>) = produce<Int>(CommonPool) {
+fun produceNumbers(context: CoroutineContext, side: SendChannel<Int>) = produce<Int>(context) {
for (num in 1..10) { // produce 10 numbers from 1 to 10
delay(100) // every 100 ms
select<Unit> {
@@ -2037,11 +2117,12 @@
launch(coroutineContext) { // this is a very fast consumer for the side channel
side.consumeEach { println("Side channel has $it") }
}
- produceNumbers(side).consumeEach {
+ produceNumbers(coroutineContext, side).consumeEach {
println("Consuming $it")
delay(250) // let us digest the consumed number properly, do not hurry
}
println("Done consuming")
+ coroutineContext.cancelChildren()
}
```
@@ -2210,6 +2291,7 @@
[delay]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/delay.html
[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run-blocking.html
[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/index.html
+[cancelAndJoin]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/cancel-and-join.html
[CancellationException]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-cancellation-exception.html
[yield]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/yield.html
[CoroutineScope.isActive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/is-active.html
@@ -2230,6 +2312,7 @@
[newCoroutineContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/new-coroutine-context.html
[CoroutineName]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-name/index.html
[Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/cancel.html
+[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/join.html
[CompletableDeferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-completable-deferred/index.html
[Deferred.onAwait]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/on-await.html
<!--- INDEX kotlinx.coroutines.experimental.sync -->
diff --git a/integration/kotlinx-coroutines-jdk8/src/main/kotlin/kotlinx/coroutines/experimental/time/Time.kt b/integration/kotlinx-coroutines-jdk8/src/main/kotlin/kotlinx/coroutines/experimental/time/Time.kt
index 4c8435e..7dd23ba 100644
--- a/integration/kotlinx-coroutines-jdk8/src/main/kotlin/kotlinx/coroutines/experimental/time/Time.kt
+++ b/integration/kotlinx-coroutines-jdk8/src/main/kotlin/kotlinx/coroutines/experimental/time/Time.kt
@@ -15,6 +15,7 @@
*/
package kotlinx.coroutines.experimental.time
+import kotlinx.coroutines.experimental.CoroutineScope
import kotlinx.coroutines.experimental.selects.SelectBuilder
import java.time.Duration
import java.util.concurrent.TimeUnit
@@ -34,11 +35,25 @@
/**
* "java.time" adapter method for [kotlinx.coroutines.experimental.withTimeout]
*/
-public suspend fun <T> withTimeout(duration: Duration, block: suspend () -> T): T =
+public suspend fun <T> withTimeout(duration: Duration, block: suspend CoroutineScope.() -> T): T =
kotlinx.coroutines.experimental.withTimeout(duration.toNanos(), TimeUnit.NANOSECONDS, block)
/**
+ * @suppress **Deprecated**: for binary compatibility only
+ */
+@Deprecated("for binary compatibility only", level=DeprecationLevel.HIDDEN)
+public suspend fun <T> withTimeout(duration: Duration, block: suspend () -> T): T =
+ kotlinx.coroutines.experimental.withTimeout(duration.toNanos(), TimeUnit.NANOSECONDS) { block() }
+
+/**
* "java.time" adapter method for [kotlinx.coroutines.experimental.withTimeoutOrNull]
*/
-public suspend fun <T> withTimeoutOrNull(duration: Duration, block: suspend () -> T): T? =
+public suspend fun <T> withTimeoutOrNull(duration: Duration, block: suspend CoroutineScope.() -> T): T? =
kotlinx.coroutines.experimental.withTimeoutOrNull(duration.toNanos(), TimeUnit.NANOSECONDS, block)
+
+/**
+ * @suppress **Deprecated**: for binary compatibility only
+ */
+@Deprecated("for binary compatibility only", level=DeprecationLevel.HIDDEN)
+public suspend fun <T> withTimeoutOrNull(duration: Duration, block: suspend () -> T): T? =
+ kotlinx.coroutines.experimental.withTimeoutOrNull(duration.toNanos(), TimeUnit.NANOSECONDS) { block() }
diff --git a/reactive/coroutines-guide-reactive.md b/reactive/coroutines-guide-reactive.md
index 823be69..fd8e30a 100644
--- a/reactive/coroutines-guide-reactive.md
+++ b/reactive/coroutines-guide-reactive.md
@@ -482,6 +482,7 @@
subject.onNext("three")
subject.onNext("four")
yield() // yield the main thread to the launched coroutine <--- HERE
+ subject.onComplete() // now complete subject's sequence to cancel consumer, too
}
```
@@ -519,6 +520,7 @@
broadcast.offer("three")
broadcast.offer("four")
yield() // yield the main thread to the launched coroutine
+ broadcast.close() // now close broadcast channel to cancel consumer, too
}
```
@@ -756,7 +758,8 @@
to the [CoroutineScope.coroutineContext] that is provided by [publish] builder. This way, all the coroutines that are
being launched here are [children](../coroutines-guide.md#children-of-a-coroutine) of the `publish`
coroutine and will get cancelled when the `publish` coroutine is cancelled or is otherwise completed.
-This implementation completes as soon as the original publisher completes.
+Moreover, since parent coroutine waits until all children are complete, this implementation fully
+merges all the received streams.
For a test, let us start with `rangeWithInterval` function from the previous example and write a
producer that sends its results twice with some delay:
@@ -799,6 +802,7 @@
3
4
12
+13
```
<!--- TEST -->
diff --git a/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Publish.kt
index 224357c..6e07631 100644
--- a/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Publish.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Publish.kt
@@ -19,7 +19,6 @@
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.experimental.AbstractCoroutine
import kotlinx.coroutines.experimental.Job
-import kotlinx.coroutines.experimental.channels.ClosedSendChannelException
import kotlinx.coroutines.experimental.channels.ProducerScope
import kotlinx.coroutines.experimental.channels.SendChannel
import kotlinx.coroutines.experimental.handleCoroutineException
@@ -77,9 +76,6 @@
override val isFull: Boolean = mutex.isLocked
override fun close(cause: Throwable?): Boolean = cancel(cause)
- private fun sendException() =
- (state as? CompletedExceptionally)?.cause ?: ClosedSendChannelException(CLOSED_MESSAGE)
-
override fun offer(element: T): Boolean {
if (!mutex.tryLock()) return false
doLockedNext(element)
@@ -115,7 +111,7 @@
// check if already closed for send
if (!isActive) {
doLockedSignalCompleted()
- throw sendException()
+ throw getCancellationException()
}
// notify subscriber
try {
@@ -127,7 +123,7 @@
} finally {
doLockedSignalCompleted()
}
- throw sendException()
+ throw getCancellationException()
}
// now update nRequested
while (true) { // lock-free loop on nRequested
@@ -197,7 +193,7 @@
}
}
- override fun onCancellation() {
+ override fun onCancellation(exceptionally: CompletedExceptionally?) {
while (true) { // lock-free loop for nRequested
val cur = _nRequested.value
if (cur == SIGNALLED) return // some other thread holding lock already signalled cancellation/completion
diff --git a/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxObservable.kt b/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxObservable.kt
index 5688d2d..848110d 100644
--- a/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxObservable.kt
+++ b/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxObservable.kt
@@ -19,7 +19,6 @@
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.experimental.AbstractCoroutine
import kotlinx.coroutines.experimental.Job
-import kotlinx.coroutines.experimental.channels.ClosedSendChannelException
import kotlinx.coroutines.experimental.channels.ProducerScope
import kotlinx.coroutines.experimental.channels.SendChannel
import kotlinx.coroutines.experimental.handleCoroutineException
@@ -60,7 +59,6 @@
block.startCoroutine(coroutine, coroutine)
}
-private const val CLOSED_MESSAGE = "This subscription had already closed (completed or failed)"
private const val CLOSED = -1L // closed, but have not signalled onCompleted/onError yet
private const val SIGNALLED = -2L // already signalled subscriber onCompleted/onError
@@ -79,9 +77,6 @@
override val isFull: Boolean = mutex.isLocked
override fun close(cause: Throwable?): Boolean = cancel(cause)
- private fun sendException() =
- (state as? CompletedExceptionally)?.cause ?: ClosedSendChannelException(CLOSED_MESSAGE)
-
override fun offer(element: T): Boolean {
if (!mutex.tryLock()) return false
doLockedNext(element)
@@ -117,7 +112,7 @@
// check if already closed for send
if (!isActive) {
doLockedSignalCompleted()
- throw sendException()
+ throw getCancellationException()
}
// notify subscriber
try {
@@ -129,7 +124,7 @@
} finally {
doLockedSignalCompleted()
}
- throw sendException()
+ throw getCancellationException()
}
// now update nRequested
while (true) { // lock-free loop on nRequested
@@ -199,7 +194,7 @@
}
}
- override fun onCancellation() {
+ override fun onCancellation(exceptionally: CompletedExceptionally?) {
while (true) { // lock-free loop for nRequested
val cur = _nRequested.value
if (cur == SIGNALLED) return // some other thread holding lock already signalled cancellation/completion
diff --git a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxObservable.kt
index 2e81e23..c1e5bd2 100644
--- a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxObservable.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxObservable.kt
@@ -22,7 +22,6 @@
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.experimental.AbstractCoroutine
import kotlinx.coroutines.experimental.Job
-import kotlinx.coroutines.experimental.channels.ClosedSendChannelException
import kotlinx.coroutines.experimental.channels.ProducerScope
import kotlinx.coroutines.experimental.channels.SendChannel
import kotlinx.coroutines.experimental.handleCoroutineException
@@ -58,7 +57,6 @@
block.startCoroutine(coroutine, coroutine)
}
-private const val CLOSED_MESSAGE = "This subscription had already closed (completed or failed)"
private const val OPEN = 0 // open channel, still working
private const val CLOSED = -1 // closed, but have not signalled onCompleted/onError yet
private const val SIGNALLED = -2 // already signalled subscriber onCompleted/onError
@@ -78,16 +76,13 @@
override val isFull: Boolean = mutex.isLocked
override fun close(cause: Throwable?): Boolean = cancel(cause)
- private fun sendException() =
- (state as? CompletedExceptionally)?.cause ?: ClosedSendChannelException(CLOSED_MESSAGE)
-
override fun offer(element: T): Boolean {
if (!mutex.tryLock()) return false
doLockedNext(element)
return true
}
- public suspend override fun send(element: T): Unit {
+ public suspend override fun send(element: T) {
// fast-path -- try send without suspension
if (offer(element)) return
// slow-path does suspend
@@ -116,7 +111,7 @@
// check if already closed for send
if (!isActive) {
doLockedSignalCompleted()
- throw sendException()
+ throw getCancellationException()
}
// notify subscriber
try {
@@ -128,7 +123,7 @@
} finally {
doLockedSignalCompleted()
}
- throw sendException()
+ throw getCancellationException()
}
/*
There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might
@@ -162,7 +157,7 @@
}
}
- override fun onCancellation() {
+ override fun onCancellation(exceptionally: CompletedExceptionally?) {
if (!_signal.compareAndSet(OPEN, CLOSED)) return // abort, other thread invoked doLockedSignalCompleted
if (mutex.tryLock()) // if we can acquire the lock
doLockedSignalCompleted()
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-08.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-08.kt
index f9e1a85..dc78fff 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-08.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-08.kt
@@ -34,4 +34,5 @@
subject.onNext("three")
subject.onNext("four")
yield() // yield the main thread to the launched coroutine <--- HERE
+ subject.onComplete() // now complete subject's sequence to cancel consumer, too
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-09.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-09.kt
index 18d7768..34f88e3 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-09.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-09.kt
@@ -34,4 +34,5 @@
broadcast.offer("three")
broadcast.offer("four")
yield() // yield the main thread to the launched coroutine
+ broadcast.close() // now close broadcast channel to cancel consumer, too
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/test/GuideReactiveTest.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/test/GuideReactiveTest.kt
index 7d84485..4597334 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/test/GuideReactiveTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/test/GuideReactiveTest.kt
@@ -137,7 +137,8 @@
"11",
"3",
"4",
- "12"
+ "12",
+ "13"
)
}