Merge branch 'await-all' into develop
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/AbstractContinuation.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/AbstractContinuation.kt
index 67a7fc6..707d6c2 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/AbstractContinuation.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/AbstractContinuation.kt
@@ -75,7 +75,7 @@
if (trySuspend()) return COROUTINE_SUSPENDED
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
val state = this.state
- if (state is CompletedExceptionally) throw state.exception
+ if (state is CompletedExceptionally) throw state.cause
return getSuccessfulResult(state)
}
@@ -99,8 +99,8 @@
}
is Cancelled -> {
// Ignore resumes in cancelled coroutines, but handle exception if a different one here
- if (proposedUpdate is CompletedExceptionally && proposedUpdate.exception != state.exception)
- handleException(proposedUpdate.exception)
+ if (proposedUpdate is CompletedExceptionally && proposedUpdate.cause != state.cause)
+ handleException(proposedUpdate.cause)
return
}
else -> error("Already resumed, but got $proposedUpdate")
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/AbstractCoroutine.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/AbstractCoroutine.kt
index 5143c15..2078a3c 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/AbstractCoroutine.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/AbstractCoroutine.kt
@@ -76,8 +76,11 @@
* This function is invoked once when this coroutine is cancelled or is completed,
* similarly to [invokeOnCompletion] with `onCancelling` set to `true`.
*
- * @param cause the cause that was passed to [Job.cancel] function or `null` if coroutine was cancelled
- * without cause or is completing normally.
+ * The meaning of [cause] parameter:
+ * * Cause is `null` when job has completed normally.
+ * * Cause is an instance of [CancellationException] when job was cancelled _normally_.
+ * **It should not be treated as an error**. In particular, it should not be reported to error logs.
+ * * Otherwise, the job had _failed_.
*/
protected open fun onCancellation(cause: Throwable?) {}
@@ -98,7 +101,7 @@
@Suppress("UNCHECKED_CAST")
internal override fun onCompletionInternal(state: Any?, mode: Int) {
if (state is CompletedExceptionally)
- onCompletedExceptionally(state.exception)
+ onCompletedExceptionally(state.cause)
else
onCompleted(state as T)
}
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Await.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Await.kt
new file mode 100644
index 0000000..59285b3
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Await.kt
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental
+
+import kotlinx.atomicfu.atomic
+
+/**
+ * Awaits for completion of given deferred values without blocking a thread and resumes normally with the list of values
+ * when all deferred computations are complete or resumes with the first thrown exception if any of computations
+ * complete exceptionally including cancellation.
+ *
+ * This function is **not** equivalent to `deferreds.map { it.await() }` which fails only when when it sequentially
+ * gets to wait the failing deferred, while this `awaitAll` fails immediately as soon as any of the deferreds fail.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
+ * this function immediately resumes with [CancellationException].
+ */
+public suspend fun <T> awaitAll(vararg deferreds: Deferred<T>): List<T> =
+ if (deferreds.isEmpty()) emptyList() else AwaitAll(deferreds).await()
+
+/**
+ * Awaits for completion of given deferred values without blocking a thread and resumes normally with the list of values
+ * when all deferred computations are complete or resumes with the first thrown exception if any of computations
+ * complete exceptionally including cancellation.
+ *
+ * This function is **not** equivalent to `this.map { it.await() }` which fails only when when it sequentially
+ * gets to wait the failing deferred, while this `awaitAll` fails immediately as soon as any of the deferreds fail.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
+ * this function immediately resumes with [CancellationException].
+ */
+public suspend fun <T> Collection<Deferred<T>>.awaitAll(): List<T> =
+ if (isEmpty()) emptyList() else AwaitAll(toTypedArray()).await()
+
+/**
+ * Suspends current coroutine until all given jobs are complete.
+ * This method is semantically equivalent to joining all given jobs one by one with `jobs.forEach { it.join() }`.
+ *
+ * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
+ * this function immediately resumes with [CancellationException].
+ */
+public suspend fun joinAll(vararg jobs: Job): Unit = jobs.forEach { it.join() }
+
+/**
+ * Suspends current coroutine until all given jobs are complete.
+ * This method is semantically equivalent to joining all given jobs one by one with `forEach { it.join() }`.
+ *
+ * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
+ * this function immediately resumes with [CancellationException].
+ */
+public suspend fun Collection<Job>.joinAll(): Unit = forEach { it.join() }
+
+private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
+ private val notCompletedCount = atomic(deferreds.size)
+
+ suspend fun await(): List<T> = suspendCancellableCoroutine { cont ->
+ deferreds.forEach {
+ it.start() // To properly await lazily started deferreds
+ cont.disposeOnCompletion(it.invokeOnCompletion(AwaitAllNode(cont, it).asHandler))
+ }
+ }
+
+ inner class AwaitAllNode(private val continuation: CancellableContinuation<List<T>>, job: Job) : JobNode<Job>(job) {
+ override fun invoke(cause: Throwable?) {
+ if (cause != null) {
+ val token = continuation.tryResumeWithException(cause)
+ if (token != null) {
+ continuation.completeResume(token)
+ }
+ } else if (notCompletedCount.decrementAndGet() == 0) {
+ continuation.resume(deferreds.map { it.getCompleted() })
+ }
+ }
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Builders.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Builders.common.kt
index 5c6d1f2..63f7fe2 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Builders.common.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Builders.common.kt
@@ -171,7 +171,7 @@
override fun hasOnFinishingHandler(update: Any?) = update is CompletedExceptionally
override fun onFinishingInternal(update: Any?) {
// note the use of the parent's job context below!
- if (update is CompletedExceptionally) handleCoroutineException(parentContext, update.exception)
+ if (update is CompletedExceptionally) handleCoroutineException(parentContext, update.cause)
}
}
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CompletedExceptionally.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CompletedExceptionally.kt
index 8e3276e..31c5c6d 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CompletedExceptionally.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CompletedExceptionally.kt
@@ -22,55 +22,28 @@
* Class for an internal state of a job that had completed exceptionally, including cancellation.
*
* **Note: This class cannot be used outside of internal coroutines framework**.
+ * **Note: cannot be internal until we get rid of MutableDelegateContinuation in IO**
*
- * @param cause the exceptional completion cause. If `cause` is null, then an exception is
- * if created via [createException] on first get from [exception] property.
- * @param allowNullCause if `null` cause is allowed.
+ * @param cause the exceptional completion cause. It's either original exceptional cause
+ * or artificial JobCancellationException if no cause was provided
* @suppress **This is unstable API and it is subject to change.**
*/
-public open class CompletedExceptionally protected constructor(
- @JvmField public val cause: Throwable?,
- allowNullCause: Boolean
+open class CompletedExceptionally(
+ @JvmField public val cause: Throwable
) {
- /**
- * Creates exceptionally completed state.
- * @param cause the exceptional completion cause.
- */
- public constructor(cause: Throwable) : this(cause, false)
-
- @Volatile
- private var _exception: Throwable? = cause // will materialize JobCancellationException on first need
-
- init {
- require(allowNullCause || cause != null) { "Null cause is not allowed" }
- }
-
- /**
- * Returns completion exception.
- */
- public val exception: Throwable get() =
- _exception ?: // atomic read volatile var or else create new
- createException().also { _exception = it }
-
- protected open fun createException(): Throwable = error("Completion exception was not specified")
-
- override fun toString(): String = "$classSimpleName[$exception]"
+ override fun toString(): String = "$classSimpleName[$cause]"
}
/**
* A specific subclass of [CompletedExceptionally] for cancelled jobs.
*
* **Note: This class cannot be used outside of internal coroutines framework**.
- *
+ *
* @param job the job that was cancelled.
- * @param cause the exceptional completion cause. If `cause` is null, then a [JobCancellationException]
- * if created on first get from [exception] property.
+ * @param cause the exceptional completion cause. If `cause` is null, then a [JobCancellationException] is created.
* @suppress **This is unstable API and it is subject to change.**
*/
-public class Cancelled(
+internal class Cancelled(
private val job: Job,
cause: Throwable?
-) : CompletedExceptionally(cause, true) {
- override fun createException(): Throwable = JobCancellationException("Job was cancelled normally", null, job)
-}
-
+) : CompletedExceptionally(cause ?: JobCancellationException("Job was cancelled normally", null, job))
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CompletionHandler.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CompletionHandler.common.kt
index dc2fd9a..e398fae 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CompletionHandler.common.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CompletionHandler.common.kt
@@ -24,6 +24,12 @@
* Installed handler should not throw any exceptions. If it does, they will get caught,
* wrapped into [CompletionHandlerException], and rethrown, potentially causing crash of unrelated code.
*
+ * The meaning of `cause` that is passed to the handler:
+ * * Cause is `null` when job has completed normally.
+ * * Cause is an instance of [CancellationException] when job was cancelled _normally_.
+ * **It should not be treated as an error**. In particular, it should not be reported to error logs.
+ * * Otherwise, the job had _failed_.
+ *
* **Note**: This type is a part of internal machinery that supports parent-child hierarchies
* and allows for implementation of suspending functions that wait on the Job's state.
* This type should not be used in general application code.
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
index 3f15e8a..7f23fbd 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
@@ -152,7 +152,7 @@
* Other options can be specified via `start` parameter. See [CoroutineStart] for details.
* An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case,,
* the resulting [Deferred] is created in _new_ state. It can be explicitly started with [start][Job.start]
- * function and will be started implicitly on the first invocation of [join][Job.join] or [await][Deferred.await].
+ * function and will be started implicitly on the first invocation of [join][Job.join], [await][Deferred.await] or [awaitAll].
*
* @param context context of the coroutine. The default value is [DefaultDispatcher].
* @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Dispatched.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Dispatched.kt
index e81e5a9..5bd1668 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Dispatched.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Dispatched.kt
@@ -142,7 +142,7 @@
state as T
public fun getExceptionalResult(state: Any?): Throwable? =
- (state as? CompletedExceptionally)?.exception
+ (state as? CompletedExceptionally)?.cause
public override fun run() {
try {
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.common.kt
index 6759e8f..8743d2b 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.common.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.common.kt
@@ -25,7 +25,7 @@
cause: Throwable?,
job: Job
) : CancellationException {
- val job: Job
+ internal val job: Job
}
internal expect class DispatchException(message: String, cause: Throwable) : RuntimeException
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
index 7cb17a5..3642688 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
@@ -279,6 +279,12 @@
* with a job's exception or cancellation cause or `null`. Otherwise, handler will be invoked once when this
* job is complete.
*
+ * The meaning of `cause` that is passed to the handler:
+ * * Cause is `null` when job has completed normally.
+ * * Cause is an instance of [CancellationException] when job was cancelled _normally_.
+ * **It should not be treated as an error**. In particular, it should not be reported to error logs.
+ * * Otherwise, the job had _failed_.
+ *
* The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] the
* registration of this handler and release its memory if its invocation is no longer needed.
* There is no need to dispose the handler after completion of this job. The references to
@@ -297,6 +303,12 @@
* with a job's cancellation cause or `null` unless [invokeImmediately] is set to false.
* Otherwise, handler will be invoked once when this job is cancelled or complete.
*
+ * The meaning of `cause` that is passed to the handler:
+ * * Cause is `null` when job has completed normally.
+ * * Cause is an instance of [CancellationException] when job was cancelled _normally_.
+ * **It should not be treated as an error**. In particular, it should not be reported to error logs.
+ * * Otherwise, the job had _failed_.
+ *
* Invocation of this handler on a transition to a transient _cancelling_ state
* is controlled by [onCancelling] boolean parameter.
* The handler is invoked on invocation of [cancel] when
@@ -641,15 +653,19 @@
private fun isCorrespondinglyCancelled(cancelled: Cancelled, proposedUpdate: Any?): Boolean {
if (proposedUpdate !is Cancelled) return false
// NOTE: equality comparison of causes is performed here by design, see equals of JobCancellationException
- return proposedUpdate.cause == cancelled.cause ||
- proposedUpdate.cause is JobCancellationException && cancelled.cause == null
+ return proposedUpdate.cause == cancelled.cause || proposedUpdate.cause is JobCancellationException
}
private fun createCancelled(cancelled: Cancelled, proposedUpdate: Any?): Cancelled {
if (proposedUpdate !is CompletedExceptionally) return cancelled // not exception -- just use original cancelled
- val exception = proposedUpdate.exception
- if (cancelled.exception == exception) return cancelled // that is the cancelled we need already!
- cancelled.cause?.let { exception.addSuppressedThrowable(it) }
+ val exception = proposedUpdate.cause
+ if (cancelled.cause == exception) return cancelled // that is the cancelled we need already!
+ // todo: We need to rework this logic to keep original cancellation cause in the state and suppress other exceptions
+ // that could have occurred while coroutine is being cancelled.
+ // Do not spam with JCE in suppressed exceptions
+ if (cancelled.cause !is JobCancellationException) {
+ exception.addSuppressedThrowable(cancelled.cause)
+ }
return Cancelled(this, exception)
}
@@ -750,11 +766,11 @@
val state = this.state
return when {
state is Finishing && state.cancelled != null ->
- state.cancelled.exception.toCancellationException("Job is being cancelled")
+ state.cancelled.cause.toCancellationException("Job is being cancelled")
state is Incomplete ->
error("Job was not completed or cancelled yet: $this")
state is CompletedExceptionally ->
- state.exception.toCancellationException("Job has failed")
+ state.cause.toCancellationException("Job has failed")
else -> JobCancellationException("Job has completed normally", null, this)
}
}
@@ -764,9 +780,8 @@
/**
* Returns the cause that signals the completion of this job -- it returns the original
- * [cancel] cause or **`null` if this job had completed
- * normally or was cancelled without a cause**. This function throws
- * [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor
+ * [cancel] cause, [JobCancellationException] or **`null` if this job had completed normally**.
+ * This function throws [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor
* [isCancelled] yet.
*/
protected fun getCompletionCause(): Throwable? {
@@ -1052,7 +1067,7 @@
}
// cancel all children in list on exceptional completion
if (proposedUpdate is CompletedExceptionally)
- child?.cancelChildrenInternal(proposedUpdate.exception)
+ child?.cancelChildrenInternal(proposedUpdate.cause)
// switch to completing state
val cancelled = (state as? Finishing)?.cancelled ?: (proposedUpdate as? Cancelled)
val completing = Finishing(list, cancelled, true)
@@ -1072,7 +1087,7 @@
}
private val Any?.exceptionOrNull: Throwable?
- get() = (this as? CompletedExceptionally)?.exception
+ get() = (this as? CompletedExceptionally)?.cause
private fun firstChild(state: Incomplete) =
state as? Child ?: state.list?.nextChild()
@@ -1224,7 +1239,7 @@
internal fun getCompletedInternal(): Any? {
val state = this.state
check(state !is Incomplete) { "This job has not completed yet" }
- if (state is CompletedExceptionally) throw state.exception
+ if (state is CompletedExceptionally) throw state.cause
return state
}
@@ -1237,7 +1252,7 @@
val state = this.state
if (state !is Incomplete) {
// already complete -- just return result
- if (state is CompletedExceptionally) throw state.exception
+ if (state is CompletedExceptionally) throw state.cause
return state
}
@@ -1251,7 +1266,7 @@
val state = this.state
check(state !is Incomplete)
if (state is CompletedExceptionally)
- cont.resumeWithException(state.exception)
+ cont.resumeWithException(state.cause)
else
cont.resume(state)
})
@@ -1270,7 +1285,7 @@
// already complete -- select result
if (select.trySelect(null)) {
if (state is CompletedExceptionally)
- select.resumeSelectCancellableWithException(state.exception)
+ select.resumeSelectCancellableWithException(state.cause)
else
block.startCoroutineUndispatched(state as T, select.completion)
}
@@ -1292,7 +1307,7 @@
val state = this.state
// Note: await is non-atomic (can be cancelled while dispatched)
if (state is CompletedExceptionally)
- select.resumeSelectCancellableWithException(state.exception)
+ select.resumeSelectCancellableWithException(state.cause)
else
block.startCoroutineCancellable(state as T, select.completion)
}
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt
index e06323d..4c1ce77 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt
@@ -102,7 +102,7 @@
@Suppress("UNCHECKED_CAST")
internal override fun onCompletionInternal(state: Any?, mode: Int) {
if (state is CompletedExceptionally)
- cont.resumeWithExceptionMode(state.exception, mode)
+ cont.resumeWithExceptionMode(state.cause, mode)
else
cont.resumeMode(state as T, mode)
}
@@ -171,7 +171,7 @@
@Suppress("UNCHECKED_CAST")
internal override fun onCompletionInternal(state: Any?, mode: Int) {
if (state is CompletedExceptionally) {
- val exception = state.exception
+ val exception = state.cause
if (exception is TimeoutCancellationException && exception.coroutine === this)
cont.resumeMode(null, mode) else
cont.resumeWithExceptionMode(exception, mode)
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt
index fc86b0a..3c32e14 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt
@@ -123,7 +123,8 @@
override fun onCancellationInternal(exceptionally: CompletedExceptionally?) {
val cause = exceptionally?.cause
val processed = when (exceptionally) {
- is Cancelled -> _channel.cancel(cause) // producer coroutine was cancelled -- cancel channel
+ // producer coroutine was cancelled -- cancel channel, but without cause if it was closed without cause
+ is Cancelled -> _channel.cancel(if (cause is CancellationException) null else cause)
else -> _channel.close(cause) // producer coroutine has completed -- close channel
}
if (!processed && cause != null)
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/intrinsics/Undispatched.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/intrinsics/Undispatched.kt
index fc8783a..8178d87 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/intrinsics/Undispatched.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/intrinsics/Undispatched.kt
@@ -85,7 +85,7 @@
return when {
result === COROUTINE_SUSPENDED -> COROUTINE_SUSPENDED
makeCompletingOnce(result, MODE_IGNORE) -> {
- if (result is CompletedExceptionally) throw result.exception else result
+ if (result is CompletedExceptionally) throw result.cause else result
}
else -> COROUTINE_SUSPENDED
}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/AwaitTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/AwaitTest.kt
new file mode 100644
index 0000000..a63a825
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/AwaitTest.kt
@@ -0,0 +1,371 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental
+
+import kotlin.coroutines.experimental.coroutineContext
+import kotlin.test.*
+
+class AwaitTest : TestBase() {
+
+ @Test
+ fun testAwaitAll() = runTest {
+ expect(1)
+ val d = async(coroutineContext) {
+ expect(3)
+ "OK"
+ }
+
+ val d2 = async(coroutineContext) {
+ yield()
+ expect(4)
+ 1L
+ }
+
+ expect(2)
+ require(d2.isActive && !d2.isCompleted)
+
+ assertEquals(listOf("OK", 1L), awaitAll(d, d2))
+ expect(5)
+
+ require(d.isCompleted && d2.isCompleted)
+ require(!d.isCancelled && !d2.isCancelled)
+ finish(6)
+ }
+
+ @Test
+ fun testAwaitAllLazy() = runTest {
+ expect(1)
+ val d = async(
+ coroutineContext,
+ start = CoroutineStart.LAZY
+ ) { expect(2); 1 }
+ val d2 = async(
+ coroutineContext,
+ start = CoroutineStart.LAZY
+ ) { expect(3); 2 }
+ assertEquals(listOf(1, 2), awaitAll(d, d2))
+ finish(4)
+ }
+
+ @Test
+ fun testAwaitAllTyped() = runTest {
+ val d1 = async(coroutineContext) { 1L }
+ val d2 = async(coroutineContext) { "" }
+ val d3 = async(coroutineContext) { }
+
+ assertEquals(listOf(1L, ""), listOf(d1, d2).awaitAll())
+ assertEquals(listOf(1L, Unit), listOf(d1, d3).awaitAll())
+ assertEquals(listOf("", Unit), listOf(d2, d3).awaitAll())
+ }
+
+ @Test
+ fun testAwaitAllExceptionally() = runTest {
+ expect(1)
+ val d = async(coroutineContext) {
+ expect(3)
+ "OK"
+ }
+
+ val d2 = async(coroutineContext) {
+ yield()
+ throw TestException()
+ }
+
+ val d3 = async(coroutineContext) {
+ expect(4)
+ delay(Long.MAX_VALUE)
+ 1
+ }
+
+ expect(2)
+ try {
+ awaitAll(d, d2, d3)
+ } catch (e: TestException) {
+ expect(5)
+ }
+
+ yield()
+ require(d.isCompleted && d2.isCompletedExceptionally && d3.isActive)
+ d3.cancel()
+ finish(6)
+ }
+
+ @Test
+ fun testAwaitAllMultipleExceptions() = runTest {
+ val d = async(coroutineContext) {
+ expect(2)
+ throw TestException()
+ }
+
+ val d2 = async(coroutineContext) {
+ yield()
+ throw TestException()
+ }
+
+ val d3 = async(coroutineContext) {
+ yield()
+ }
+
+ expect(1)
+ try {
+ awaitAll(d, d2, d3)
+ } catch (e: TestException) {
+ expect(3)
+ }
+
+ finish(4)
+ }
+
+ @Test
+ fun testAwaitAllCancellation() = runTest {
+ val outer = async(coroutineContext) {
+
+ expect(1)
+ val inner = async(coroutineContext) {
+ expect(4)
+ delay(Long.MAX_VALUE)
+ }
+
+ expect(2)
+ awaitAll(inner)
+ expectUnreached()
+ }
+
+ yield()
+ expect(3)
+ yield()
+ require(outer.isActive)
+ outer.cancel()
+ require(outer.isCancelled)
+ finish(5)
+ }
+
+ @Test
+ fun testAwaitAllPartiallyCompleted() = runTest {
+ val d1 = async(coroutineContext) { expect(1); 1 }
+ d1.await()
+ val d2 = async(coroutineContext) { expect(3); 2 }
+ expect(2)
+ assertEquals(listOf(1, 2), awaitAll(d1, d2))
+ require(d1.isCompleted && d2.isCompleted)
+ finish(4)
+ }
+
+ @Test
+ fun testAwaitAllPartiallyCompletedExceptionally() = runTest {
+ val d1 = async(coroutineContext) {
+ expect(1)
+ throw TestException()
+ }
+
+ yield()
+
+ // This job is called after exception propagation
+ val d2 = async(coroutineContext) { expect(4) }
+
+ expect(2)
+ try {
+ awaitAll(d1, d2)
+ expectUnreached()
+ } catch (e: TestException) {
+ expect(3)
+ }
+
+ require(d2.isActive)
+ d2.await()
+ require(d1.isCompleted && d2.isCompleted)
+ finish(5)
+ }
+
+ @Test
+ fun testAwaitAllFullyCompleted() = runTest {
+ val d1 = CompletableDeferred(Unit)
+ val d2 = CompletableDeferred(Unit)
+ val job = async(coroutineContext) { expect(3) }
+ expect(1)
+ awaitAll(d1, d2)
+ expect(2)
+ job.await()
+ finish(4)
+ }
+
+ @Test
+ fun testAwaitOnSet() = runTest {
+ val d1 = CompletableDeferred(Unit)
+ val d2 = CompletableDeferred(Unit)
+ val job = async(coroutineContext) { expect(2) }
+ expect(1)
+ listOf(d1, d2, job).awaitAll()
+ finish(3)
+ }
+
+ @Test
+ fun testAwaitAllFullyCompletedExceptionally() = runTest {
+ val d1 = CompletableDeferred<Unit>(parent = null)
+ .apply { completeExceptionally(TestException()) }
+ val d2 = CompletableDeferred<Unit>(parent = null)
+ .apply { completeExceptionally(TestException()) }
+ val job = async(coroutineContext) { expect(3) }
+ expect(1)
+ try {
+ awaitAll(d1, d2)
+ } catch (e: TestException) {
+ expect(2)
+ }
+
+ job.await()
+ finish(4)
+ }
+
+ @Test
+ fun testAwaitAllSameJobMultipleTimes() = runTest {
+ val d = async(coroutineContext) { "OK" }
+ // Duplicates are allowed though kdoc doesn't guarantee that
+ assertEquals(listOf("OK", "OK", "OK"), awaitAll(d, d, d))
+ }
+
+ @Test
+ fun testAwaitAllSameThrowingJobMultipleTimes() = runTest {
+ val d1 =
+ async(coroutineContext) { throw TestException() }
+ val d2 = async(coroutineContext) { } // do nothing
+
+ try {
+ expect(1)
+ // Duplicates are allowed though kdoc doesn't guarantee that
+ awaitAll(d1, d2, d1, d2)
+ expectUnreached()
+ } catch (e: TestException) {
+ finish(2)
+ }
+ }
+
+ @Test
+ fun testAwaitAllEmpty() = runTest {
+ expect(1)
+ assertEquals(emptyList(), awaitAll<Unit>())
+ assertEquals(emptyList(), emptyList<Deferred<Unit>>().awaitAll())
+ finish(2)
+ }
+
+ // joinAll
+
+ @Test
+ fun testJoinAll() = runTest {
+ val d1 = launch(coroutineContext) { expect(2) }
+ val d2 = async(coroutineContext) {
+ expect(3)
+ "OK"
+ }
+ val d3 = launch(coroutineContext) { expect(4) }
+
+ expect(1)
+ joinAll(d1, d2, d3)
+ finish(5)
+ }
+
+ @Test
+ fun testJoinAllLazy() = runTest {
+ expect(1)
+ val d = async(
+ coroutineContext,
+ start = CoroutineStart.LAZY
+ ) { expect(2) }
+ val d2 = launch(
+ coroutineContext,
+ start = CoroutineStart.LAZY
+ ) { expect(3) }
+ joinAll(d, d2)
+ finish(4)
+ }
+
+ @Test
+ fun testJoinAllExceptionally() = runTest {
+ val d1 = launch(coroutineContext) {
+ expect(2)
+ }
+ val d2 = async(coroutineContext) {
+ expect(3)
+ throw TestException()
+ }
+ val d3 = async(coroutineContext) {
+ expect(4)
+ }
+
+ expect(1)
+ joinAll(d1, d2, d3)
+ finish(5)
+ }
+
+ @Test
+ fun testJoinAllCancellation() = runTest {
+ val outer = launch(coroutineContext) {
+ expect(2)
+ val inner = launch(coroutineContext) {
+ expect(3)
+ delay(Long.MAX_VALUE)
+ }
+
+ joinAll(inner)
+ expectUnreached()
+ }
+
+ expect(1)
+ yield()
+ require(outer.isActive)
+ yield()
+ outer.cancel()
+ outer.join()
+ finish(4)
+ }
+
+ @Test
+ fun testJoinAllAlreadyCompleted() = runTest {
+ val job = launch(coroutineContext) {
+ expect(1)
+ }
+
+ job.join()
+ expect(2)
+
+ joinAll(job)
+ finish(3)
+ }
+
+ @Test
+ fun testJoinAllEmpty() = runTest {
+ expect(1)
+ joinAll()
+ listOf<Job>().joinAll()
+ finish(2)
+ }
+
+ @Test
+ fun testJoinAllSameJob() = runTest {
+ val job = launch(coroutineContext) { }
+ joinAll(job, job, job)
+ }
+
+ @Test
+ fun testJoinAllSameJobExceptionally() = runTest {
+ val job =
+ async(coroutineContext) { throw TestException() }
+ joinAll(job, job, job)
+ }
+
+ private class TestException : Exception()
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/DelayTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/DelayTest.kt
new file mode 100644
index 0000000..9904d3d
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/DelayTest.kt
@@ -0,0 +1,61 @@
+
+@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913
+
+package kotlinx.coroutines.experimental
+
+import kotlinx.coroutines.experimental.timeunit.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class DelayTest : TestBase() {
+
+ @Test
+ fun testCancellation() = runTest(expected = {it is JobCancellationException}) {
+ runAndCancel(3600, TimeUnit.SECONDS)
+ }
+
+ @Test
+ fun testMaxLongValue()= runTest(expected = {it is JobCancellationException}) {
+ runAndCancel(Long.MAX_VALUE)
+ }
+
+ @Test
+ fun testMaxIntValue()= runTest(expected = {it is JobCancellationException}) {
+ runAndCancel(Int.MAX_VALUE.toLong())
+ }
+
+ @Test
+ fun testOverflowOnUnitConversion()= runTest(expected = {it is JobCancellationException}) {
+ runAndCancel(Long.MAX_VALUE, TimeUnit.SECONDS)
+ }
+
+ @Test
+ fun testRegularDelay() = runTest {
+ val deferred = async(coroutineContext) {
+ expect(2)
+ delay(1)
+ expect(3)
+ }
+
+ expect(1)
+ yield()
+ deferred.await()
+ finish(4)
+ }
+
+ private suspend fun runAndCancel(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS) {
+ expect(1)
+ val deferred = async(coroutineContext) {
+ expect(2)
+ delay(time, unit)
+ expectUnreached()
+ }
+
+ yield()
+ expect(3)
+ require(deferred.isActive)
+ deferred.cancel()
+ finish(4)
+ deferred.await()
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt
index 84da40f..b70a7a0 100644
--- a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt
@@ -21,7 +21,6 @@
import kotlin.test.*
class ArrayBroadcastChannelTest : TestBase() {
-
@Test
fun testBasic() = runTest {
expect(1)
@@ -161,7 +160,7 @@
sub.consumeEach {
check(it == ++expected)
if (it == 2) {
- sub.close()
+ sub.cancel()
}
}
check(expected == 2)
@@ -172,8 +171,18 @@
val channel = BroadcastChannel<Int>(1)
val sub = channel.openSubscription()
assertFalse(sub.isClosedForReceive)
- sub.close()
+ sub.cancel()
assertTrue(sub.isClosedForReceive)
sub.receive()
}
+
+ @Test
+ fun testCancelWithCause() = runTest({ it is TestException }) {
+ val channel = BroadcastChannel<Int>(1)
+ val subscription = channel.openSubscription()
+ subscription.cancel(TestException())
+ subscription.receiveOrNull()
+ }
+
+ private class TestException : Exception()
}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt
index 61fdaef..170e579 100644
--- a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt
@@ -21,7 +21,6 @@
import kotlin.test.*
class ArrayChannelTest : TestBase() {
-
@Test
fun testSimple() = runTest {
val q = ArrayChannel<Int>(1)
@@ -151,4 +150,13 @@
check(q.receiveOrNull() == null)
finish(12)
}
+
+ @Test
+ fun testCancelWithCause() = runTest({ it is TestException }) {
+ val channel = ArrayChannel<Int>(5)
+ channel.cancel(TestException())
+ channel.receiveOrNull()
+ }
+
+ private class TestException : Exception()
}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt
index 1fd7413..2b8775b 100644
--- a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt
@@ -21,7 +21,6 @@
import kotlin.test.*
class ConflatedChannelTest : TestBase() {
-
@Test
fun testBasicConflationOfferPoll() {
val q = ConflatedChannel<Int>()
@@ -90,4 +89,13 @@
check(q.receiveOrNull() == null)
finish(2)
}
-}
\ No newline at end of file
+
+ @Test
+ fun testCancelWithCause() = runTest({ it is TestException }) {
+ val channel = ConflatedChannel<Int>()
+ channel.cancel(TestException())
+ channel.receiveOrNull()
+ }
+
+ private class TestException : Exception()
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt
index 897801e..2bf376f 100644
--- a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt
@@ -16,11 +16,10 @@
package kotlinx.coroutines.experimental.channels
-import kotlinx.coroutines.experimental.TestBase
+import kotlinx.coroutines.experimental.*
import kotlin.test.*
class LinkedListChannelTest : TestBase() {
-
@Test
fun testBasic() = runTest {
val c = LinkedListChannel<Int>()
@@ -46,4 +45,13 @@
check(q.isClosedForReceive)
check(q.receiveOrNull() == null)
}
+
+ @Test
+ fun testCancelWithCause() = runTest({ it is TestException }) {
+ val channel = LinkedListChannel<Int>()
+ channel.cancel(TestException())
+ channel.receiveOrNull()
+ }
+
+ private class TestException : Exception()
}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt
index 522f6d6..41ba678 100644
--- a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt
@@ -21,7 +21,6 @@
import kotlin.test.*
class ProduceTest : TestBase() {
-
@Test
fun testBasic() = runTest {
val c = produce(coroutineContext) {
@@ -41,7 +40,7 @@
}
@Test
- fun testCancel() = runTest {
+ fun testCancelWithoutCause() = runTest {
val c = produce(coroutineContext) {
expect(2)
send(1)
@@ -60,7 +59,39 @@
expect(4)
c.cancel()
expect(5)
- check(c.receiveOrNull() == null)
+ assertNull(c.receiveOrNull())
expect(6)
}
+
+ @Test
+ fun testCancelWithCause() = runTest {
+ val c = produce(coroutineContext) {
+ expect(2)
+ send(1)
+ expect(3)
+ try {
+ send(2) // will get cancelled
+ } catch (e: Exception) {
+ finish(6)
+ check(e is JobCancellationException && e.job == coroutineContext[Job])
+ check(e.cause is TestException)
+ throw e
+ }
+ expectUnreached()
+ }
+
+ expect(1)
+ check(c.receive() == 1)
+ expect(4)
+ c.cancel(TestException())
+
+ try {
+ assertNull(c.receiveOrNull())
+ expectUnreached()
+ } catch (e: TestException) {
+ expect(5)
+ }
+ }
+
+ private class TestException : Exception()
}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
index 6e1b2c3..2b1b987 100644
--- a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
@@ -21,7 +21,6 @@
import kotlin.test.*
class RendezvousChannelTest : TestBase() {
-
@Test
fun testSimple() = runTest {
val q = RendezvousChannel<Int>()
@@ -289,4 +288,13 @@
check(q.receiveOrNull() == null)
finish(12)
}
+
+ @Test
+ fun testCancelWithCause() = runTest({ it is TestException }) {
+ val channel = RendezvousChannel<Int>()
+ channel.cancel(TestException())
+ channel.receiveOrNull()
+ }
+
+ private class TestException : Exception()
}
diff --git a/core/kotlinx-coroutines-core/README.md b/core/kotlinx-coroutines-core/README.md
index 1e1c722..c32be50 100644
--- a/core/kotlinx-coroutines-core/README.md
+++ b/core/kotlinx-coroutines-core/README.md
@@ -39,13 +39,19 @@
Top-level suspending functions:
-| **Name** | **Description**
-| ------------------- | ---------------
-| [delay] | Non-blocking sleep
-| [yield] | Yields thread in single-threaded dispatchers
-| [withContext] | Switches to a different context
-| [withTimeout] | Set execution time-limit with exception on timeout
-| [withTimeoutOrNull] | Set execution time-limit will null result on timeout
+| **Name** | **Description**
+| ------------------- | ---------------
+| [delay] | Non-blocking sleep
+| [yield] | Yields thread in single-threaded dispatchers
+| [withContext] | Switches to a different context
+| [withTimeout] | Set execution time-limit with exception on timeout
+| [withTimeoutOrNull] | Set execution time-limit will null result on timeout
+| [awaitAll] | Awaits for successful completion of all given jobs or exceptional completion of any
+| [joinAll] | Joins on all given jobs
+
+Cancellation support for user-defined suspending functions is available with [suspendCancellableCoroutine]
+helper function. [NonCancellable] job object is provided to suppress cancellation with
+`run(NonCancellable) {...}` block of code.
[Select][kotlinx.coroutines.experimental.selects.select] expression waits for the result of multiple suspending functions simultaneously:
@@ -59,10 +65,6 @@
| [Mutex][kotlinx.coroutines.experimental.sync.Mutex] | [lock][kotlinx.coroutines.experimental.sync.Mutex.lock] | [onLock][kotlinx.coroutines.experimental.sync.Mutex.onLock] | [tryLock][kotlinx.coroutines.experimental.sync.Mutex.tryLock]
| none | [delay] | [onTimeout][kotlinx.coroutines.experimental.selects.SelectBuilder.onTimeout] | none
-Cancellation support for user-defined suspending functions is available with [suspendCancellableCoroutine]
-helper function. [NonCancellable] job object is provided to suppress cancellation with
-`run(NonCancellable) {...}` block of code.
-
This module provides debugging facilities for coroutines (run JVM with `-ea` or `-Dkotlinx.coroutines.debug` options)
and [newCoroutineContext] function to write user-defined coroutine builders that work with these
debugging facilities.
@@ -113,12 +115,14 @@
[withContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-context.html
[withTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout.html
[withTimeoutOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout-or-null.html
+[awaitAll]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/await-all.html
+[joinAll]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/join-all.html
+[suspendCancellableCoroutine]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/suspend-cancellable-coroutine.html
[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/join.html
[Job.onJoin]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/on-join.html
[Job.isCompleted]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/is-completed.html
[Deferred.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/await.html
[Deferred.onAwait]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/on-await.html
-[suspendCancellableCoroutine]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/suspend-cancellable-coroutine.html
[newCoroutineContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/new-coroutine-context.html
<!--- INDEX kotlinx.coroutines.experimental.sync -->
[kotlinx.coroutines.experimental.sync.Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/index.html
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
index 7d20709..d4ef498 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
@@ -94,7 +94,7 @@
timeSource.unregisterTimeLoopThread()
// now return result
val state = this.state
- (state as? CompletedExceptionally)?.let { throw it.exception }
+ (state as? CompletedExceptionally)?.let { throw it.cause }
return state as T
}
}
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt
index 820d87a..d8eeedf 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt
@@ -41,7 +41,7 @@
*/
public const val DEBUG_PROPERTY_VALUE_OFF = "off"
-private val DEBUG = run {
+internal val DEBUG = run {
val value = try { System.getProperty(DEBUG_PROPERTY_NAME) }
catch (e: SecurityException) { null }
when (value) {
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
index 6d6685b..2cb2433 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
@@ -18,9 +18,10 @@
import kotlinx.atomicfu.*
import kotlinx.coroutines.experimental.internal.*
-import kotlinx.coroutines.experimental.timeunit.TimeUnit
+import kotlinx.coroutines.experimental.timeunit.*
import java.util.concurrent.locks.*
import kotlin.coroutines.experimental.*
+import kotlin.jvm.*
/**
* Implemented by [CoroutineDispatcher] implementations that have event loop inside and can
@@ -303,6 +304,12 @@
time: Long, timeUnit: TimeUnit,
private val cont: CancellableContinuation<Unit>
) : DelayedTask(time, timeUnit) {
+
+ init {
+ // Note that this operation isn't lock-free, but very short
+ cont.disposeOnCompletion(this)
+ }
+
override fun run() {
with(cont) { resumeUndispatched(Unit) }
}
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.kt
index 7d7c532..462d816 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.kt
@@ -16,8 +16,6 @@
package kotlinx.coroutines.experimental
-import java.util.concurrent.*
-
/**
* This exception gets thrown if an exception is caught while processing [CompletionHandler] invocation for [Job].
*/
@@ -42,12 +40,26 @@
public actual class JobCancellationException public actual constructor(
message: String,
cause: Throwable?,
- /**
- * The job that was cancelled.
- */
- public actual val job: Job
+ @JvmField internal actual val job: Job
) : CancellationException(message) {
- init { if (cause != null) initCause(cause) }
+
+ init {
+ if (cause != null) initCause(cause)
+ }
+
+ override fun fillInStackTrace(): Throwable {
+ if (DEBUG) {
+ return super.fillInStackTrace()
+ }
+
+ /*
+ * In non-debug mode we don't want to have a stacktrace on every cancellation/close,
+ * parent job reference is enough. Stacktrace of JCE is not needed most of the time (e.g., it is not logged)
+ * and hurts performance.
+ */
+ return this
+ }
+
override fun toString(): String = "${super.toString()}; job=$job"
override fun equals(other: Any?): Boolean =
other === this ||
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/AwaitStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/AwaitStressTest.kt
new file mode 100644
index 0000000..d5be824
--- /dev/null
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/AwaitStressTest.kt
@@ -0,0 +1,136 @@
+package kotlinx.coroutines.experimental
+
+import org.junit.*
+import org.junit.Test
+import java.util.concurrent.*
+
+class AwaitStressTest : TestBase() {
+
+ private class TestException : Exception() {
+ override fun fillInStackTrace(): Throwable = this
+ }
+
+ private val iterations = 50_000 * stressTestMultiplier
+ private val pool = newFixedThreadPoolContext(4, "AwaitStressTest")
+
+ @After
+ fun tearDown() {
+ pool.close()
+ }
+
+ @Test
+ fun testMultipleExceptions() = runTest {
+
+ repeat(iterations) {
+ val barrier = CyclicBarrier(4)
+
+ val d1 = async(pool) {
+ barrier.await()
+ throw TestException()
+ }
+
+ val d2 = async(pool) {
+ barrier.await()
+ throw TestException()
+ }
+
+ val d3 = async(pool) {
+ barrier.await()
+ 1L
+ }
+
+ try {
+ barrier.await()
+ awaitAll(d1, d2, d3)
+ expectUnreached()
+ } catch (e: TestException) {
+ // Expected behaviour
+ }
+
+ barrier.reset()
+ }
+ }
+
+ @Test
+ fun testAwaitAll() = runTest {
+ val barrier = CyclicBarrier(3)
+
+ repeat(iterations) {
+ val d1 = async(pool) {
+ barrier.await()
+ 1L
+ }
+
+ val d2 = async(pool) {
+ barrier.await()
+ 2L
+ }
+
+ barrier.await()
+ awaitAll(d1, d2)
+ require(d1.isCompleted && d2.isCompleted)
+ barrier.reset()
+ }
+ }
+
+ @Test
+ fun testConcurrentCancellation() = runTest {
+ var cancelledOnce = false
+ repeat(iterations) {
+ val barrier = CyclicBarrier(3)
+
+ val d1 = async(pool) {
+ barrier.await()
+ delay(10_000)
+ yield()
+ }
+
+ val d2 = async(pool) {
+ barrier.await()
+ d1.cancel()
+ }
+
+ barrier.await()
+ try {
+ awaitAll(d1, d2)
+ } catch (e: JobCancellationException) {
+ cancelledOnce = true
+ }
+ }
+
+ require(cancelledOnce) { "Cancellation exception wasn't properly caught" }
+ }
+
+ @Test
+ fun testMutatingCollection() = runTest {
+ val barrier = CyclicBarrier(4)
+
+ repeat(iterations) {
+ // thread-safe collection that we are going to modify
+ val deferreds = CopyOnWriteArrayList<Deferred<Long>>()
+
+ deferreds += async(pool) {
+ barrier.await()
+ 1L
+ }
+
+ deferreds += async(pool) {
+ barrier.await()
+ 2L
+ }
+
+ deferreds += async(pool) {
+ barrier.await()
+ deferreds.removeAt(2)
+ 3L
+ }
+
+ val allJobs = ArrayList(deferreds)
+ barrier.await()
+ val results = deferreds.awaitAll() // shouldn't hang
+ check(results == listOf(1L, 2L, 3L) || results == listOf(1L, 2L))
+ allJobs.awaitAll()
+ barrier.reset()
+ }
+ }
+}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/DefaultExecutorStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/DefaultExecutorStressTest.kt
new file mode 100644
index 0000000..0291866
--- /dev/null
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/DefaultExecutorStressTest.kt
@@ -0,0 +1,37 @@
+package kotlinx.coroutines.experimental
+
+import org.junit.*
+import java.util.concurrent.*
+import kotlin.coroutines.experimental.*
+
+class DefaultExecutorStressTest : TestBase() {
+
+ @Test
+ fun testDelay() = runTest {
+ val iterations = 100_000 * stressTestMultiplier
+
+ val ctx = DefaultExecutor + coroutineContext
+ expect(1)
+ var expected = 1
+ repeat(iterations) {
+ expect(++expected)
+ val deferred = async(ctx) {
+ expect(++expected)
+ val largeArray = IntArray(10_000) { it }
+ delay(Long.MAX_VALUE, TimeUnit.NANOSECONDS)
+ println(largeArray) // consume to avoid DCE, actually unreachable
+ }
+
+ expect(++expected)
+ yield()
+ deferred.cancel()
+ try {
+ deferred.await()
+ } catch (e: JobCancellationException) {
+ expect(++expected)
+ }
+ }
+
+ finish(2 + iterations * 4)
+ }
+}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/DelayTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/DelayJvmTest.kt
similarity index 98%
rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/DelayTest.kt
rename to core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/DelayJvmTest.kt
index 764f736..c21d4f7 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/DelayTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/DelayJvmTest.kt
@@ -23,7 +23,7 @@
import java.util.concurrent.Executors
import kotlin.coroutines.experimental.*
-class DelayTest : TestBase() {
+class DelayJvmTest : TestBase() {
/**
* Test that delay works properly in contexts with custom [ContinuationInterceptor]
*/
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ActorTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ActorTest.kt
index 1494d2f..268ae31 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ActorTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ActorTest.kt
@@ -20,13 +20,24 @@
import org.hamcrest.core.*
import org.junit.*
import org.junit.Assert.*
+import org.junit.runner.*
+import org.junit.runners.*
+import java.io.*
import kotlin.coroutines.experimental.*
-class ActorTest : TestBase() {
+@RunWith(Parameterized::class)
+class ActorTest(private val capacity: Int) : TestBase() {
+
+ companion object {
+ @Parameterized.Parameters(name = "Capacity: {0}")
+ @JvmStatic
+ fun params(): Collection<Array<Any>> = listOf(0, 1, Channel.UNLIMITED, Channel.CONFLATED).map { arrayOf<Any>(it) }
+ }
+
@Test
- fun testEmpty() = runBlocking<Unit> {
+ fun testEmpty() = runBlocking {
expect(1)
- val actor = actor<String>(coroutineContext) {
+ val actor = actor<String>(coroutineContext, capacity) {
expect(3)
}
actor as Job // type assertion
@@ -42,9 +53,9 @@
}
@Test
- fun testOne() = runBlocking<Unit> {
+ fun testOne() = runBlocking {
expect(1)
- val actor = actor<String>(coroutineContext) {
+ val actor = actor<String>(coroutineContext, capacity) {
expect(3)
assertThat(receive(), IsEqual("OK"))
expect(6)
@@ -68,4 +79,71 @@
assertThat(actor.isClosedForSend, IsEqual(true))
finish(7)
}
-}
\ No newline at end of file
+
+ @Test
+ fun testCloseWithoutCause() = runTest {
+ val actor = actor<Int>(coroutineContext, capacity) {
+ val element = channel.receiveOrNull()
+ expect(2)
+ assertEquals(42, element)
+ val next = channel.receiveOrNull()
+ assertNull(next)
+ expect(3)
+ }
+
+ expect(1)
+ actor.send(42)
+ yield()
+ actor.close()
+ yield()
+ finish(4)
+ }
+
+ @Test
+ fun testCloseWithCause() = runTest {
+ val actor = actor<Int>(coroutineContext, capacity) {
+ val element = channel.receiveOrNull()
+ expect(2)
+ require(element!! == 42)
+ try {
+ val next = channel.receiveOrNull()
+ } catch (e: IOException) {
+ expect(3)
+ }
+ }
+
+ expect(1)
+ actor.send(42)
+ yield()
+ actor.close(IOException())
+ yield()
+ finish(4)
+ }
+
+ @Test
+ fun testCancelEnclosingJob() = runTest {
+ val job = async(coroutineContext) {
+ actor<Int>(coroutineContext, capacity) {
+ expect(1)
+ channel.receiveOrNull()
+ expectUnreached()
+ }
+ }
+
+ yield()
+ yield()
+
+ expect(2)
+ yield()
+ job.cancel()
+
+ try {
+ job.await()
+ expectUnreached()
+ } catch (e: JobCancellationException) {
+ assertTrue(e.message?.contains("Job was cancelled normally") ?: false)
+ }
+
+ finish(3)
+ }
+}
diff --git a/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteChannelCloseTest.kt b/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteChannelCloseTest.kt
new file mode 100644
index 0000000..91f5c8b
--- /dev/null
+++ b/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteChannelCloseTest.kt
@@ -0,0 +1,140 @@
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.ClosedReceiveChannelException
+import org.junit.After
+import org.junit.Test
+import java.io.IOException
+import kotlin.coroutines.experimental.coroutineContext
+
+class ByteChannelCloseTest : TestBase() {
+
+ private val from = ByteChannel(true)
+ private val to = ByteChannel(true)
+
+ @After
+ fun tearDown() {
+ from.close(CancellationException())
+ to.close(CancellationException())
+ }
+
+ @Test
+ fun testCloseWithCause() = runBlocking {
+ expect(1)
+
+ launch(coroutineContext) {
+ expect(2)
+
+ try {
+ from.copyAndClose(to) // should suspend and then throw IOException
+ expectUnreached()
+ } catch (expected: IOException) {
+ expect(4)
+ }
+ }
+
+ yield()
+ expect(3)
+
+ from.close(IOException())
+ yield()
+
+ expect(5)
+
+ try {
+ to.readInt()
+ expectUnreached()
+ } catch (expected: IOException) {
+ finish(6)
+ }
+ }
+
+ @Test
+ fun testCancelWithCause() = runBlocking {
+ expect(1)
+
+ launch(coroutineContext) {
+ expect(2)
+
+ try {
+ from.copyAndClose(to) // should suspend and then throw IOException
+ expectUnreached()
+ } catch (expected: IOException) {
+ expect(4)
+ }
+ }
+
+ yield()
+ expect(3)
+
+ from.cancel(IOException())
+ yield()
+
+ expect(5)
+
+ try {
+ to.readInt()
+ expectUnreached()
+ } catch (expected: IOException) {
+ finish(6)
+ }
+ }
+
+ @Test
+ fun testCloseWithoutCause() = runBlocking {
+ expect(1)
+
+ launch(coroutineContext) {
+ expect(2)
+ from.copyAndClose(to)
+ expect(4)
+ }
+
+ yield()
+ expect(3)
+
+ from.close()
+ yield()
+
+ expect(5)
+ require(to.isClosedForWrite)
+
+ try {
+ to.readInt()
+ expectUnreached()
+ } catch (expected: ClosedReceiveChannelException) {
+ finish(6)
+ }
+ }
+
+ @Test
+ fun testCancelWithoutCause() = runBlocking {
+ expect(1)
+
+ launch(coroutineContext) {
+ expect(2)
+ try {
+ from.copyAndClose(to)
+ expectUnreached()
+ } catch (e: CancellationException) {
+ expect(4)
+ }
+ }
+
+ yield()
+ expect(3)
+
+ from.cancel()
+ yield()
+
+ expect(5)
+ require(to.isClosedForWrite)
+
+ try {
+ to.readInt()
+ expectUnreached()
+ } catch (expected: CancellationException) {
+ finish(6)
+ }
+ }
+}
diff --git a/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/CopyAndCloseTest.kt b/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/CopyAndCloseTest.kt
index 504a103..00abc7c 100644
--- a/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/CopyAndCloseTest.kt
+++ b/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/CopyAndCloseTest.kt
@@ -49,39 +49,6 @@
}
@Test
- fun failurePropagation() = runBlocking {
- expect(1)
-
- launch(coroutineContext) {
- expect(2)
-
- try {
- from.copyAndClose(to) // should suspend and then throw IOException
- fail("Should rethrow exception")
- } catch (expected: IOException) {
- }
-
- expect(4)
- }
-
- yield()
- expect(3)
-
- from.close(IOException())
- yield()
-
- expect(5)
-
- try {
- to.readInt()
- fail("Should throw exception")
- } catch (expected: IOException) {
- }
-
- finish(6)
- }
-
- @Test
fun copyLimitedTest() = runBlocking {
expect(1)
diff --git a/js/kotlinx-coroutines-core-js/README.md b/js/kotlinx-coroutines-core-js/README.md
index 552d4e0..4933712 100644
--- a/js/kotlinx-coroutines-core-js/README.md
+++ b/js/kotlinx-coroutines-core-js/README.md
@@ -10,7 +10,6 @@
| [async] | [Deferred] | [CoroutineScope] | Returns a single value with the future result
| [produce][kotlinx.coroutines.experimental.channels.produce] | [ReceiveChannel][kotlinx.coroutines.experimental.channels.ReceiveChannel] | [ProducerScope][kotlinx.coroutines.experimental.channels.ProducerScope] | Produces a stream of elements
-
Coroutine dispatchers implementing [CoroutineDispatcher]:
| **Name** | **Description**
@@ -32,16 +31,17 @@
| [Mutex][kotlinx.coroutines.experimental.sync.Mutex] | [lock][kotlinx.coroutines.experimental.sync.Mutex.lock] | Mutual exclusion
| [Channel][kotlinx.coroutines.experimental.channels.Channel] | [send][kotlinx.coroutines.experimental.channels.SendChannel.send], [receive][kotlinx.coroutines.experimental.channels.ReceiveChannel.receive] | Communication channel (aka queue or exchanger)
-
Top-level suspending functions:
-| **Name** | **Description**
-| ------------------- | ---------------
-| [delay] | Non-blocking sleep
-| [yield] | Yields thread in single-threaded dispatchers
-| [withContext] | Switches to a different context
-| [withTimeout] | Set execution time-limit with exception on timeout
-| [withTimeoutOrNull] | Set execution time-limit will null result on timeout
+| **Name** | **Description**
+| ------------------- | ---------------
+| [delay] | Non-blocking sleep
+| [yield] | Yields thread in single-threaded dispatchers
+| [withContext] | Switches to a different context
+| [withTimeout] | Set execution time-limit with exception on timeout
+| [withTimeoutOrNull] | Set execution time-limit will null result on timeout
+| [awaitAll] | Awaits for successful completion of all given jobs or exceptional completion of any
+| [joinAll] | Joins on all given jobs
Cancellation support for user-defined suspending functions is available with [suspendCancellableCoroutine]
helper function. [NonCancellable] job object is provided to suppress cancellation with
@@ -59,7 +59,6 @@
| [Mutex][kotlinx.coroutines.experimental.sync.Mutex] | [lock][kotlinx.coroutines.experimental.sync.Mutex.lock] | [onLock][kotlinx.coroutines.experimental.sync.Mutex.onLock] | [tryLock][kotlinx.coroutines.experimental.sync.Mutex.tryLock]
| none | [delay] | [onTimeout][kotlinx.coroutines.experimental.selects.SelectBuilder.onTimeout] | none
-
# Package kotlinx.coroutines.experimental
General-purpose coroutine builders, contexts, and helper functions.
@@ -81,6 +80,8 @@
[withContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-context.html
[withTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout.html
[withTimeoutOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout-or-null.html
+[awaitAll]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/await-all.html
+[joinAll]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/join-all.html
[suspendCancellableCoroutine]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/suspend-cancellable-coroutine.html
[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/join.html
[Job.onJoin]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/on-join.html
diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.kt
index 6894ac0..f73467e 100644
--- a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.kt
+++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.kt
@@ -40,10 +40,7 @@
public actual class JobCancellationException public actual constructor(
message: String,
public override val cause: Throwable?,
- /**
- * The job that was cancelled.
- */
- public actual val job: Job
+ internal actual val job: Job
) : CancellationException(message.withCause(cause)) {
override fun toString(): String = "${super.toString()}; job=$job"
override fun equals(other: Any?): Boolean =
diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/JSDispatcher.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/JSDispatcher.kt
index 10e072f..080ac46 100644
--- a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/JSDispatcher.kt
+++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/JSDispatcher.kt
@@ -26,7 +26,9 @@
}
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
- setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, time.toIntMillis(unit))
+ val handle = setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, time.toIntMillis(unit))
+ // Actually on cancellation, but clearTimeout is idempotent
+ continuation.invokeOnCompletion { clearTimeout(handle) }
}
override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle {
diff --git a/knit/src/Knit.kt b/knit/src/Knit.kt
index 566c80e..275fc28 100644
--- a/knit/src/Knit.kt
+++ b/knit/src/Knit.kt
@@ -410,8 +410,8 @@
fun findModuleRootDir(name: String): String =
moduleRoots
- .map { it + "/" + name }
- .firstOrNull { File(it + "/" + moduleMarker).exists() }
+ .map { "$it/$name" }
+ .firstOrNull { File("$it/$moduleMarker").exists() }
?: throw IllegalArgumentException("Module $name is not found in any of the module root dirs")
data class ApiIndexKey(
@@ -419,26 +419,21 @@
val pkg: String
)
-val apiIndexCache: MutableMap<ApiIndexKey, Map<String, String>> = HashMap()
+val apiIndexCache: MutableMap<ApiIndexKey, Map<String, List<String>>> = HashMap()
val REF_LINE_REGEX = Regex("<a href=\"([a-z/.\\-]+)\">([a-zA-z.]+)</a>")
val INDEX_HTML = "/index.html"
val INDEX_MD = "/index.md"
val FUNCTIONS_SECTION_HEADER = "### Functions"
-val AMBIGUOUS = "#AMBIGUOUS: "
-
-fun HashMap<String,String>.putUnambiguous(key: String, value: String) {
+fun HashMap<String, MutableList<String>>.putUnambiguous(key: String, value: String) {
val oldValue = this[key]
- val putVal =
- if (oldValue != null && oldValue != value) {
- when {
- oldValue.contains("[$value]") -> oldValue
- oldValue.startsWith(AMBIGUOUS) -> "$oldValue; [$value]"
- else -> "$AMBIGUOUS[$oldValue]; [$value]"
- }
- } else value
- put(key, putVal)
+ if (oldValue != null) {
+ oldValue.add(value)
+ put(key, oldValue)
+ } else {
+ put(key, mutableListOf(value))
+ }
}
fun loadApiIndex(
@@ -446,10 +441,10 @@
path: String,
pkg: String,
namePrefix: String = ""
-): Map<String, String>? {
+): Map<String, MutableList<String>>? {
val fileName = docsRoot + "/" + path + INDEX_MD
val visited = mutableSetOf<String>()
- val map = HashMap<String,String>()
+ val map = HashMap<String, MutableList<String>>()
var inFunctionsSection = false
File(fileName).withLineNumberReader<LineNumberReader>(::LineNumberReader) {
while (true) {
@@ -499,11 +494,12 @@
while (it.hasNext()) {
val refName = it.next()
val refLink = map[refName] ?: continue
- if (refLink.startsWith(AMBIGUOUS)) {
- println("WARNING: Ambiguous reference to [$refName]: ${refLink.substring(AMBIGUOUS.length)}")
- continue
+ if (refLink.size > 1) {
+ println("INFO: Ambiguous reference to [$refName]: $refLink, taking the shortest one")
}
- indexList += "[$refName]: $siteRoot/$refLink"
+
+ val link = refLink.minBy { it.length }
+ indexList += "[$refName]: $siteRoot/$link"
it.remove()
}
return indexList
diff --git a/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Publish.kt
index 63fa447..ba53496 100644
--- a/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Publish.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Publish.kt
@@ -167,7 +167,7 @@
_nRequested.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
val cause = getCompletionCause()
try {
- if (cause != null)
+ if (cause != null && cause !is CancellationException)
subscriber.onError(cause)
else
subscriber.onComplete()
diff --git a/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxObservable.kt b/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxObservable.kt
index e482de2..f1fd15a 100644
--- a/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxObservable.kt
+++ b/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxObservable.kt
@@ -168,7 +168,7 @@
_nRequested.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
val cause = getCompletionCause()
try {
- if (cause != null)
+ if (cause != null && cause !is CancellationException)
subscriber.onError(cause)
else
subscriber.onCompleted()
diff --git a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxObservable.kt
index 0251495..55f1e36 100644
--- a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxObservable.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxObservable.kt
@@ -158,7 +158,7 @@
_signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
val cause = getCompletionCause()
try {
- if (cause != null)
+ if (cause != null && cause !is CancellationException)
subscriber.onError(cause)
else
subscriber.onComplete()