Flow scope (#1227)
* Introducing flowScope, builder necessary for creating cancellation-transparent flow operators
* Incorporate flow scope into flow operators
Fixes #1218
Fixes #1128
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
index 8ff82c5..ab51a2d 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
@@ -381,7 +381,6 @@
public fun fold (Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)Ljava/lang/Object;
public fun get (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext$Element;
public final fun getCancellationException ()Ljava/util/concurrent/CancellationException;
- protected fun getCancelsParent ()Z
public fun getChildJobCancellationCause ()Ljava/util/concurrent/CancellationException;
public final fun getChildren ()Lkotlin/sequences/Sequence;
protected final fun getCompletionCause ()Ljava/lang/Throwable;
@@ -396,6 +395,7 @@
public final fun isCancelled ()Z
public final fun isCompleted ()Z
public final fun isCompletedExceptionally ()Z
+ protected fun isScopedCoroutine ()Z
public final fun join (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun minusKey (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext;
protected fun onCancelling (Ljava/lang/Throwable;)V
diff --git a/kotlinx-coroutines-core/common/src/Exceptions.common.kt b/kotlinx-coroutines-core/common/src/Exceptions.common.kt
index e8c2f5e..62b6ba4 100644
--- a/kotlinx-coroutines-core/common/src/Exceptions.common.kt
+++ b/kotlinx-coroutines-core/common/src/Exceptions.common.kt
@@ -23,7 +23,7 @@
internal val job: Job
}
-internal expect class CoroutinesInternalError(message: String, cause: Throwable) : Error
+internal class CoroutinesInternalError(message: String, cause: Throwable) : Error(message, cause)
internal expect fun Throwable.addSuppressedThrowable(other: Throwable)
// For use in tests
diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt
index 4963c37..d8b6b92 100644
--- a/kotlinx-coroutines-core/common/src/JobSupport.kt
+++ b/kotlinx-coroutines-core/common/src/JobSupport.kt
@@ -319,6 +319,31 @@
cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
}
+ /**
+ * The method that is invoked when the job is cancelled to possibly propagate cancellation to the parent.
+ * Returns `true` if the parent is responsible for handling the exception, `false` otherwise.
+ *
+ * Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
+ * may leak to the [CoroutineExceptionHandler].
+ */
+ private fun cancelParent(cause: Throwable): Boolean {
+ /* CancellationException is considered "normal" and parent usually is not cancelled when child produces it.
+ * This allow parent to cancel its children (normally) without being cancelled itself, unless
+ * child crashes and produce some other exception during its completion.
+ */
+ val isCancellation = cause is CancellationException
+ val parent = parentHandle
+ // No parent -- ignore CE, report other exceptions.
+ if (parent === null || parent === NonDisposableHandle) {
+ return isCancellation
+ }
+
+ // Is scoped coroutine -- don't propagate, will be rethrown
+ if (isScopedCoroutine) return isCancellation
+ // Notify parent but don't forget to check cancellation
+ return parent.childCancelled(cause) || isCancellation
+ }
+
private fun NodeList.notifyCompletion(cause: Throwable?) =
notifyHandlers<JobNode<*>>(this, cause)
@@ -594,21 +619,29 @@
cancelImpl(parentJob)
}
- // Child was cancelled with cause
- // It is overridden in supervisor implementations to ignore child cancellation
- public open fun childCancelled(cause: Throwable): Boolean =
- cancelImpl(cause) && handlesException
+ /**
+ * Child was cancelled with a cause.
+ * In this method parent decides whether it cancels itself (e.g. on a critical failure) and whether it handles the exception of the child.
+ * It is overridden in supervisor implementations to completely ignore any child cancellation.
+ * Returns `true` if exception is handled, `false` otherwise (then caller is responsible for handling an exception)
+ *
+ * Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
+ * may leak to the [CoroutineExceptionHandler].
+ */
+ public open fun childCancelled(cause: Throwable): Boolean {
+ if (cause is CancellationException) return true
+ return cancelImpl(cause) && handlesException
+ }
/**
* Makes this [Job] cancelled with a specified [cause].
* It is used in [AbstractCoroutine]-derived classes when there is an internal failure.
*/
- public fun cancelCoroutine(cause: Throwable?) =
- cancelImpl(cause)
+ public fun cancelCoroutine(cause: Throwable?) = cancelImpl(cause)
// cause is Throwable or ParentJob when cancelChild was invoked
// returns true is exception was handled, false otherwise
- private fun cancelImpl(cause: Any?): Boolean {
+ internal fun cancelImpl(cause: Any?): Boolean {
if (onCancelComplete) {
// make sure it is completing, if cancelMakeCompleting returns true it means it had make it
// completing and had recorded exception
@@ -912,14 +945,12 @@
protected open fun onCancelling(cause: Throwable?) {}
/**
- * When this function returns `true` the parent is cancelled on cancellation of this job.
- * Note that [CancellationException] is considered "normal" and parent is not cancelled when child produces it.
- * This allows parent to cancel its children (normally) without being cancelled itself, unless
- * child crashes and produce some other exception during its completion.
- *
- * @suppress **This is unstable API and it is subject to change.*
+ * Returns `true` for scoped coroutines.
+ * Scoped coroutine is a coroutine that is executed sequentially within the enclosing scope without any concurrency.
+ * Scoped coroutines always handle any exception happened within -- they just rethrow it to the enclosing scope.
+ * Examples of scoped coroutines are `coroutineScope`, `withTimeout` and `runBlocking`.
*/
- protected open val cancelsParent: Boolean get() = true
+ protected open val isScopedCoroutine: Boolean get() = false
/**
* Returns `true` for jobs that handle their exceptions or integrate them into the job's result via [onCompletionInternal].
@@ -939,20 +970,9 @@
*
* This method is invoked **exactly once** when the final exception of the job is determined
* and before it becomes complete. At the moment of invocation the job and all its children are complete.
- *
- * @suppress **This is unstable API and it is subject to change.*
*/
protected open fun handleJobException(exception: Throwable): Boolean = false
- private fun cancelParent(cause: Throwable): Boolean {
- // CancellationException is considered "normal" and parent is not cancelled when child produces it.
- // This allow parent to cancel its children (normally) without being cancelled itself, unless
- // child crashes and produce some other exception during its completion.
- if (cause is CancellationException) return true
- if (!cancelsParent) return false
- return parentHandle?.childCancelled(cause) == true
- }
-
/**
* Override for completion actions that need to update some external object depending on job's state,
* right before all the waiters for coroutine's completion are notified.
diff --git a/kotlinx-coroutines-core/common/src/Timeout.kt b/kotlinx-coroutines-core/common/src/Timeout.kt
index 3c902db..8bfaf33 100644
--- a/kotlinx-coroutines-core/common/src/Timeout.kt
+++ b/kotlinx-coroutines-core/common/src/Timeout.kt
@@ -85,9 +85,7 @@
override val defaultResumeMode: Int get() = MODE_DIRECT
override val callerFrame: CoroutineStackFrame? get() = (uCont as? CoroutineStackFrame)
override fun getStackTraceElement(): StackTraceElement? = null
-
- override val cancelsParent: Boolean
- get() = false // it throws exception to parent instead of cancelling it
+ override val isScopedCoroutine: Boolean get() = true
@Suppress("LeakingThis", "Deprecation")
override fun run() {
diff --git a/kotlinx-coroutines-core/common/src/channels/Produce.kt b/kotlinx-coroutines-core/common/src/channels/Produce.kt
index d7e01ab..9e34773 100644
--- a/kotlinx-coroutines-core/common/src/channels/Produce.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Produce.kt
@@ -126,7 +126,7 @@
return coroutine
}
-private class ProducerCoroutine<E>(
+internal open class ProducerCoroutine<E>(
parentContext: CoroutineContext, channel: Channel<E>
) : ChannelCoroutine<E>(parentContext, channel, active = true), ProducerScope<E> {
override val isActive: Boolean
diff --git a/kotlinx-coroutines-core/common/src/flow/Builders.kt b/kotlinx-coroutines-core/common/src/flow/Builders.kt
index 6147b65..b4ff26d 100644
--- a/kotlinx-coroutines-core/common/src/flow/Builders.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Builders.kt
@@ -313,7 +313,7 @@
public inline fun <T> callbackFlow(@BuilderInference noinline block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
channelFlow(block)
-// ChannelFlow implementation that is the first in the chain of flow operations and introduces (builds) a flow
+// ChannelFlow implementation that is the first in the chain of flow operations and introduces (builds) a flow
private class ChannelFlowBuilder<T>(
private val block: suspend ProducerScope<T>.() -> Unit,
context: CoroutineContext = EmptyCoroutineContext,
diff --git a/kotlinx-coroutines-core/common/src/flow/Migration.kt b/kotlinx-coroutines-core/common/src/flow/Migration.kt
index 77beb37..bf20d2f 100644
--- a/kotlinx-coroutines-core/common/src/flow/Migration.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Migration.kt
@@ -118,7 +118,6 @@
@Deprecated(message = "withContext in flow body is deprecated, use flowOn instead", level = DeprecationLevel.ERROR)
public fun <T, R> FlowCollector<T>.withContext(context: CoroutineContext, block: suspend () -> R): Unit = error("Should not be called")
-
/**
* `subscribe` is Rx-specific API that has no direct match in flows.
* One can use `launch` instead, for example the following:
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
index 30005af..57a0132 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
@@ -63,10 +63,10 @@
scope.broadcast(context, produceCapacity, start, block = collectToFun)
fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
- scope.produce(context, produceCapacity, block = collectToFun)
+ scope.flowProduce(context, produceCapacity, block = collectToFun)
override suspend fun collect(collector: FlowCollector<T>) =
- coroutineScope { // todo: flowScope
+ coroutineScope {
val channel = produceImpl(this)
channel.consumeEach { collector.emit(it) }
}
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt b/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt
new file mode 100644
index 0000000..98f5cec
--- /dev/null
+++ b/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow.internal
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.internal.*
+import kotlinx.coroutines.intrinsics.*
+import kotlin.coroutines.*
+import kotlin.coroutines.intrinsics.*
+import kotlinx.coroutines.flow.unsafeFlow as flow
+
+/**
+ * Creates a [CoroutineScope] and calls the specified suspend block with this scope.
+ * This builder is similar to [coroutineScope] with the only exception that it *ties* lifecycle of children
+ * and itself regarding the cancellation, thus being cancelled when one of the children becomes cancelled.
+ *
+ * For example:
+ * ```
+ * flowScope {
+ * launch {
+ * throw CancellationException()
+ * }
+ * } // <- CE will be rethrown here
+ * ```
+ */
+internal suspend fun <R> flowScope(@BuilderInference block: suspend CoroutineScope.() -> R): R =
+ suspendCoroutineUninterceptedOrReturn { uCont ->
+ val coroutine = FlowCoroutine(uCont.context, uCont)
+ coroutine.startUndispatchedOrReturn(coroutine, block)
+ }
+
+/**
+ * Creates a flow that also provides a [CoroutineScope] for each collector
+ * Shorthand for:
+ * ```
+ * flow {
+ * flowScope {
+ * ...
+ * }
+ * }
+ * ```
+ * with additional constraint on cancellation.
+ * To cancel child without cancelling itself, `cancel(ChildCancelledException())` should be used.
+ */
+internal fun <R> scopedFlow(@BuilderInference block: suspend CoroutineScope.(FlowCollector<R>) -> Unit): Flow<R> =
+ flow {
+ val collector = this
+ flowScope { block(collector) }
+ }
+
+/*
+ * Shortcut for produce { flowScope {block() } }
+ */
+internal fun <T> CoroutineScope.flowProduce(
+ context: CoroutineContext,
+ capacity: Int = 0, @BuilderInference block: suspend ProducerScope<T>.() -> Unit
+): ReceiveChannel<T> {
+ val channel = Channel<T>(capacity)
+ val newContext = newCoroutineContext(context)
+ val coroutine = FlowProduceCoroutine(newContext, channel)
+ coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
+ return coroutine
+}
+
+private class FlowCoroutine<T>(
+ context: CoroutineContext,
+ uCont: Continuation<T>
+) : ScopeCoroutine<T>(context, uCont) {
+
+ public override fun childCancelled(cause: Throwable): Boolean {
+ if (cause is ChildCancelledException) return true
+ return cancelImpl(cause)
+ }
+}
+
+private class FlowProduceCoroutine<T>(
+ parentContext: CoroutineContext,
+ channel: Channel<T>
+) : ProducerCoroutine<T>(parentContext, channel) {
+
+ public override fun childCancelled(cause: Throwable): Boolean {
+ if (cause is ChildCancelledException) return true
+ return cancelImpl(cause)
+ }
+}
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/AbortFlowException.common.kt b/kotlinx-coroutines-core/common/src/flow/internal/FlowExceptions.common.kt
similarity index 71%
rename from kotlinx-coroutines-core/common/src/flow/internal/AbortFlowException.common.kt
rename to kotlinx-coroutines-core/common/src/flow/internal/FlowExceptions.common.kt
index 6d5a4b4..6c675b3 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/AbortFlowException.common.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/FlowExceptions.common.kt
@@ -11,3 +11,8 @@
* This exception should never escape outside of operator's implementation.
*/
internal expect class AbortFlowException() : CancellationException
+
+/**
+ * Exception used to cancel child of [scopedFlow] without cancelling the whole scope.
+ */
+internal expect class ChildCancelledException() : CancellationException
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
index 32e9b3f..4db3044 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
@@ -60,34 +60,33 @@
*/
public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
require(timeoutMillis > 0) { "Debounce timeout should be positive" }
- return flow {
- coroutineScope {
- val values = Channel<Any?>(Channel.CONFLATED) // Actually Any, KT-30796
- // Channel is not closed deliberately as there is no close with value
- val collector = async {
- collect { value -> values.send(value ?: NULL) }
- }
+ return scopedFlow { downstream ->
+ val values = Channel<Any?>(Channel.CONFLATED) // Actually Any, KT-30796
+ // Channel is not closed deliberately as there is no close with value
+ val collector = async {
+ collect { value -> values.send(value ?: NULL) }
+ }
- var isDone = false
- var lastValue: Any? = null
- while (!isDone) {
- select<Unit> {
- values.onReceive {
- lastValue = it
- }
+ var isDone = false
+ var lastValue: Any? = null
+ while (!isDone) {
+ select<Unit> {
+ values.onReceive {
+ lastValue = it
+ }
- lastValue?.let { value -> // set timeout when lastValue != null
- onTimeout(timeoutMillis) {
- lastValue = null // Consume the value
- emit(NULL.unbox(value))
- }
+ lastValue?.let { value ->
+ // set timeout when lastValue != null
+ onTimeout(timeoutMillis) {
+ lastValue = null // Consume the value
+ downstream.emit(NULL.unbox(value))
}
+ }
- // Close with value 'idiom'
- collector.onAwait {
- if (lastValue != null) emit(NULL.unbox(lastValue))
- isDone = true
- }
+ // Close with value 'idiom'
+ collector.onAwait {
+ if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
+ isDone = true
}
}
}
@@ -112,32 +111,31 @@
*/
public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T> {
require(periodMillis > 0) { "Sample period should be positive" }
- return flow {
- coroutineScope {
- val values = produce<Any?>(capacity = Channel.CONFLATED) { // Actually Any, KT-30796
- collect { value -> send(value ?: NULL) }
- }
+ return scopedFlow { downstream ->
+ val values = produce<Any?>(capacity = Channel.CONFLATED) {
+ // Actually Any, KT-30796
+ collect { value -> send(value ?: NULL) }
+ }
- var isDone = false
- var lastValue: Any? = null
- val ticker = fixedPeriodTicker(periodMillis)
- while (!isDone) {
- select<Unit> {
- values.onReceiveOrNull {
- if (it == null) {
- ticker.cancel()
- isDone = true
- } else {
- lastValue = it
- }
+ var isDone = false
+ var lastValue: Any? = null
+ val ticker = fixedPeriodTicker(periodMillis)
+ while (!isDone) {
+ select<Unit> {
+ values.onReceiveOrNull {
+ if (it == null) {
+ ticker.cancel(ChildCancelledException())
+ isDone = true
+ } else {
+ lastValue = it
}
+ }
- // todo: shall be start sampling only when an element arrives or sample aways as here?
- ticker.onReceive {
- val value = lastValue ?: return@onReceive
- lastValue = null // Consume the value
- emit(NULL.unbox(value))
- }
+ // todo: shall be start sampling only when an element arrives or sample aways as here?
+ ticker.onReceive {
+ val value = lastValue ?: return@onReceive
+ lastValue = null // Consume the value
+ downstream.emit(NULL.unbox(value))
}
}
}
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
index f7a6447..0fa6e8a 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
@@ -129,17 +129,16 @@
* produces `aa bb b_last`
*/
@FlowPreview
-public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = flow {
- coroutineScope {
- var previousFlow: Job? = null
- collect { value ->
- // Linearize calls to emit as alternative to the channel. Bonus points for never-overlapping channels.
- previousFlow?.cancelAndJoin()
- // Undispatched to have better user experience in case of synchronous flows
- previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
- transform(value).collect { innerValue ->
- emit(innerValue)
- }
+public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = scopedFlow { downstream ->
+ var previousFlow: Job? = null
+ collect { value ->
+ // Linearize calls to emit as alternative to the channel. Bonus points for never-overlapping channels.
+ previousFlow?.cancel(ChildCancelledException())
+ previousFlow?.join()
+ // Undispatched to have better user experience in case of synchronous flows
+ previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
+ transform(value).collect { innerValue ->
+ downstream.emit(innerValue)
}
}
}
@@ -175,7 +174,7 @@
override suspend fun flowCollect(collector: FlowCollector<T>) {
// this function should not have been invoked when channel was explicitly requested
check(capacity == OPTIONAL_CHANNEL)
- coroutineScope { // todo: flowScope
+ flowScope {
mergeImpl(this, collector.asConcurrentFlowCollector())
}
}
diff --git a/kotlinx-coroutines-core/common/src/internal/Scopes.kt b/kotlinx-coroutines-core/common/src/internal/Scopes.kt
index 3361694..9197ec8 100644
--- a/kotlinx-coroutines-core/common/src/internal/Scopes.kt
+++ b/kotlinx-coroutines-core/common/src/internal/Scopes.kt
@@ -17,13 +17,12 @@
) : AbstractCoroutine<T>(context, true), CoroutineStackFrame {
final override val callerFrame: CoroutineStackFrame? get() = uCont as CoroutineStackFrame?
final override fun getStackTraceElement(): StackTraceElement? = null
+ final override val isScopedCoroutine: Boolean get() = true
+
override val defaultResumeMode: Int get() = MODE_DIRECT
internal val parent: Job? get() = parentContext[Job]
- override val cancelsParent: Boolean
- get() = false // it throws exception to parent instead of cancelling it
-
@Suppress("UNCHECKED_CAST")
override fun afterCompletionInternal(state: Any?, mode: Int) {
if (state is CompletedExceptionally) {
diff --git a/kotlinx-coroutines-core/common/test/SupervisorTest.kt b/kotlinx-coroutines-core/common/test/SupervisorTest.kt
index fae7091..535073e 100644
--- a/kotlinx-coroutines-core/common/test/SupervisorTest.kt
+++ b/kotlinx-coroutines-core/common/test/SupervisorTest.kt
@@ -219,4 +219,22 @@
yield() // to coroutineScope
finish(7)
}
+
+ @Test
+ fun testSupervisorJobCancellationException() = runTest {
+ val job = SupervisorJob()
+ val child = launch(job + CoroutineExceptionHandler { _, _ -> expectUnreached() }) {
+ expect(1)
+ hang {
+ expect(3)
+ }
+ }
+
+ yield()
+ expect(2)
+ child.cancelAndJoin()
+ job.complete()
+ job.join()
+ finish(4)
+ }
}
diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt
index 0ae30e8..a77f8fa 100644
--- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt
@@ -81,4 +81,63 @@
assertFailsWith<TestException>(flow)
finish(4)
}
+
+ @Test
+ fun testMergeOneCoroutineWithCancellation() = runTest {
+ val flow = flowOf(1, 2, 3)
+ val f = flow.mergeOneCoroutine(flow).take(2)
+ assertEquals(listOf(1, 1), f.toList())
+ }
+
+ @Test
+ fun testMergeTwoCoroutinesWithCancellation() = runTest {
+ val flow = flowOf(1, 2, 3)
+ val f = flow.mergeTwoCoroutines(flow).take(2)
+ assertEquals(listOf(1, 1), f.toList())
+ }
+
+ private fun Flow<Int>.mergeTwoCoroutines(other: Flow<Int>): Flow<Int> = channelFlow {
+ launch {
+ collect { send(it); yield() }
+ }
+ launch {
+ other.collect { send(it) }
+ }
+ }
+
+ private fun Flow<Int>.mergeOneCoroutine(other: Flow<Int>): Flow<Int> = channelFlow {
+ launch {
+ collect { send(it); yield() }
+ }
+
+ other.collect { send(it); yield() }
+ }
+
+ @Test
+ fun testBufferWithTimeout() = runTest {
+ fun Flow<Int>.bufferWithTimeout(): Flow<Int> = channelFlow {
+ expect(2)
+ launch {
+ expect(3)
+ hang {
+ expect(5)
+ }
+ }
+ launch {
+ expect(4)
+ collect {
+ withTimeout(-1) {
+ send(it)
+ }
+ expectUnreached()
+ }
+ expectUnreached()
+ }
+ }
+
+ val flow = flowOf(1, 2, 3).bufferWithTimeout()
+ expect(1)
+ assertFailsWith<TimeoutCancellationException>(flow)
+ finish(6)
+ }
}
diff --git a/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt
index d992d06..a6b5340 100644
--- a/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt
@@ -45,4 +45,3 @@
finish(3)
}
}
-
diff --git a/kotlinx-coroutines-core/common/test/flow/internal/FlowScopeTest.kt b/kotlinx-coroutines-core/common/test/flow/internal/FlowScopeTest.kt
new file mode 100644
index 0000000..d41ab88
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/internal/FlowScopeTest.kt
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow.internal
+
+import kotlinx.coroutines.*
+import kotlin.test.*
+
+class FlowScopeTest : TestBase() {
+
+ @Test
+ fun testCancellation() = runTest {
+ assertFailsWith<CancellationException> {
+ flowScope {
+ expect(1)
+ val child = launch {
+ expect(3)
+ hang { expect(5) }
+ }
+ expect(2)
+ yield()
+ expect(4)
+ child.cancel()
+ }
+ }
+ finish(6)
+ }
+
+ @Test
+ fun testCancellationWithChildCancelled() = runTest {
+ flowScope {
+ expect(1)
+ val child = launch {
+ expect(3)
+ hang { expect(5) }
+ }
+ expect(2)
+ yield()
+ expect(4)
+ child.cancel(ChildCancelledException())
+ }
+ finish(6)
+ }
+
+ @Test
+ fun testCancellationWithSuspensionPoint() = runTest {
+ assertFailsWith<CancellationException> {
+ flowScope {
+ expect(1)
+ val child = launch {
+ expect(3)
+ hang { expect(6) }
+ }
+ expect(2)
+ yield()
+ expect(4)
+ child.cancel()
+ hang { expect(5) }
+ }
+ }
+ finish(7)
+ }
+
+ @Test
+ fun testNestedScopes() = runTest {
+ assertFailsWith<CancellationException> {
+ flowScope {
+ flowScope {
+ launch {
+ throw CancellationException(null)
+ }
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt
index bda9927..54244f0 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt
@@ -197,6 +197,46 @@
assertFailsWith<TestException>(flow)
finish(2)
}
+
+ @Test
+ fun testCancellationExceptionUpstream() = runTest {
+ val f1 = flow {
+ expect(1)
+ emit(1)
+ throw CancellationException("")
+ }
+ val f2 = flow {
+ emit(1)
+ hang { expect(3) }
+ }
+
+ val flow = f1.combineLatest(f2, { _, _ -> 1 }).onEach { expect(2) }
+ assertFailsWith<CancellationException>(flow)
+ finish(4)
+ }
+
+ @Test
+ fun testCancellationExceptionDownstream() = runTest {
+ val f1 = flow {
+ emit(1)
+ expect(2)
+ hang { expect(5) }
+ }
+ val f2 = flow {
+ emit(1)
+ expect(3)
+ hang { expect(6) }
+ }
+
+ val flow = f1.combineLatest(f2, { _, _ -> 1 }).onEach {
+ expect(1)
+ yield()
+ expect(4)
+ throw CancellationException("")
+ }
+ assertFailsWith<CancellationException>(flow)
+ finish(7)
+ }
}
class CombineLatestTest : CombineLatestTestBase() {
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt
index 607d4cd..2a6e9c1 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt
@@ -95,20 +95,25 @@
}
@Test
- fun testUpstreamError() = runTest {
+ fun testUpstreamError()= testUpstreamError(TimeoutCancellationException(""))
+
+ @Test
+ fun testUpstreamErrorCancellation() = testUpstreamError(TimeoutCancellationException(""))
+
+ private inline fun <reified T: Throwable> testUpstreamError(cause: T) = runTest {
val latch = Channel<Unit>()
val flow = flow {
expect(1)
emit(1)
expect(2)
latch.receive()
- throw TestException()
+ throw cause
}.debounce(1).map {
latch.send(Unit)
hang { expect(3) }
}
- assertFailsWith<TestException>(flow)
+ assertFailsWith<T>(flow)
finish(4)
}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt
index 5d007c3..6069ae6 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt
@@ -36,4 +36,41 @@
consumer.cancelAndJoin()
finish(3)
}
+
+ @Test
+ fun testCancellationExceptionDownstream() = runTest {
+ val flow = flow {
+ emit(1)
+ hang { expect(2) }
+ }.flatMapMerge {
+ flow {
+ emit(it)
+ expect(1)
+ throw CancellationException("")
+ }
+ }
+
+ assertFailsWith<CancellationException>(flow)
+ finish(3)
+ }
+
+ @Test
+ fun testCancellationExceptionUpstream() = runTest {
+ val flow = flow {
+ expect(1)
+ emit(1)
+ expect(2)
+ yield()
+ throw CancellationException("")
+ }.flatMapMerge {
+ flow {
+ expect(3)
+ emit(it)
+ hang { expect(4) }
+ }
+ }
+
+ assertFailsWith<CancellationException>(flow)
+ finish(5)
+ }
}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt
index 49df21d..4adc354 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt
@@ -234,6 +234,33 @@
finish(6)
}
+ @Test
+ fun testTimeoutExceptionUpstream() = runTest {
+ val flow = flow {
+ emit(1)
+ yield()
+ withTimeout(-1) {}
+ emit(42)
+ }.flowOn(NamedDispatchers("foo")).onEach {
+ expect(1)
+ }
+ assertFailsWith<TimeoutCancellationException>(flow)
+ finish(2)
+ }
+
+ @Test
+ fun testTimeoutExceptionDownstream() = runTest {
+ val flow = flow {
+ emit(1)
+ hang { expect(2) }
+ }.flowOn(NamedDispatchers("foo")).onEach {
+ expect(1)
+ withTimeout(-1) {}
+ }
+ assertFailsWith<TimeoutCancellationException>(flow)
+ finish(3)
+ }
+
private inner class Source(private val value: Int) {
public var contextName: String = "unknown"
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlowWithTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlowWithTest.kt
index 055f847..a785814 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/FlowWithTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlowWithTest.kt
@@ -199,4 +199,33 @@
ensureActive()
finish(5)
}
+
+ @Test
+ fun testTimeoutException() = runTest {
+ val flow = flow {
+ emit(1)
+ yield()
+ withTimeout(-1) {}
+ emit(42)
+ }.flowWith(NamedDispatchers("foo")) {
+ onEach { expect(1) }
+ }
+ assertFailsWith<TimeoutCancellationException>(flow)
+ finish(2)
+ }
+
+ @Test
+ fun testTimeoutExceptionDownstream() = runTest {
+ val flow = flow {
+ emit(1)
+ hang { expect(2) }
+ }.flowWith(NamedDispatchers("foo")) {
+ onEach {
+ expect(1)
+ withTimeout(-1) {}
+ }
+ }
+ assertFailsWith<TimeoutCancellationException>(flow)
+ finish(3)
+ }
}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt
index e77b128..9c96352 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt
@@ -169,20 +169,25 @@
}
@Test
- fun testUpstreamError() = runTest {
+ fun testUpstreamError() = testUpstreamError(TestException())
+
+ @Test
+ fun testUpstreamErrorCancellationException() = testUpstreamError(CancellationException(""))
+
+ private inline fun <reified T: Throwable> testUpstreamError(cause: T) = runTest {
val latch = Channel<Unit>()
val flow = flow {
expect(1)
emit(1)
expect(2)
latch.receive()
- throw TestException()
+ throw cause
}.sample(1).map {
latch.send(Unit)
hang { expect(3) }
}
- assertFailsWith<TestException>(flow)
+ assertFailsWith<T>(flow)
finish(4)
}
@@ -219,7 +224,6 @@
finish(3)
}
-
@Test
fun testUpstreamErrorSampleNotTriggeredInIsolatedContext() = runTest {
val flow = flow {
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/SwitchMapTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/SwitchMapTest.kt
index 933bb16..fabca72 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/SwitchMapTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/SwitchMapTest.kt
@@ -113,4 +113,10 @@
assertFailsWith<TestException>(flow)
finish(5)
}
+
+ @Test
+ fun testTake() = runTest {
+ val flow = flowOf(1, 2, 3, 4, 5).switchMap { flowOf(it) }
+ assertEquals(listOf(1), flow.take(1).toList())
+ }
}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt
index decd230..b28320c 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt
@@ -93,7 +93,7 @@
}
@Test
- fun testCancesWhenFlowIsDone2() = runTest {
+ fun testCancelWhenFlowIsDone2() = runTest {
val f1 = flow<String> {
emit("1")
emit("2")
@@ -189,4 +189,52 @@
assertFailsWith<TestException>(flow)
finish(2)
}
+
+ @Test
+ fun testCancellationUpstream() = runTest {
+ val f1 = flow {
+ expect(1)
+ emit(1)
+ yield()
+ expect(4)
+ throw CancellationException("")
+ }
+
+ val f2 = flow {
+ expect(2)
+ emit(1)
+ expect(5)
+ hang { expect(6) }
+ }
+
+ val flow = f1.zip(f2, { _, _ -> 1 }).onEach { expect(3) }
+ assertFailsWith<CancellationException>(flow)
+ finish(7)
+ }
+
+ @Test
+ fun testCancellationDownstream() = runTest {
+ val f1 = flow {
+ expect(1)
+ emit(1)
+ yield()
+ expect(4)
+ hang { expect(6) }
+ }
+
+ val f2 = flow {
+ expect(2)
+ emit(1)
+ expect(5)
+ hang { expect(7) }
+ }
+
+ val flow = f1.zip(f2, { _, _ -> 1 }).onEach {
+ expect(3)
+ yield()
+ throw CancellationException("")
+ }
+ assertFailsWith<CancellationException>(flow)
+ finish(8)
+ }
}
diff --git a/kotlinx-coroutines-core/js/src/Exceptions.kt b/kotlinx-coroutines-core/js/src/Exceptions.kt
index 83a0cda..f427041 100644
--- a/kotlinx-coroutines-core/js/src/Exceptions.kt
+++ b/kotlinx-coroutines-core/js/src/Exceptions.kt
@@ -48,8 +48,6 @@
(message!!.hashCode() * 31 + job.hashCode()) * 31 + (cause?.hashCode() ?: 0)
}
-internal actual class CoroutinesInternalError actual constructor(message: String, cause: Throwable) : Error(message.withCause(cause))
-
@Suppress("FunctionName")
internal fun IllegalStateException(message: String, cause: Throwable?) =
IllegalStateException(message.withCause(cause))
diff --git a/kotlinx-coroutines-core/js/src/flow/internal/AbortFlowException.kt b/kotlinx-coroutines-core/js/src/flow/internal/FlowExceptions.kt
similarity index 72%
rename from kotlinx-coroutines-core/js/src/flow/internal/AbortFlowException.kt
rename to kotlinx-coroutines-core/js/src/flow/internal/FlowExceptions.kt
index d6a9c31..8422f2b 100644
--- a/kotlinx-coroutines-core/js/src/flow/internal/AbortFlowException.kt
+++ b/kotlinx-coroutines-core/js/src/flow/internal/FlowExceptions.kt
@@ -7,3 +7,4 @@
import kotlinx.coroutines.*
internal actual class AbortFlowException : CancellationException("Flow was aborted, no more elements needed")
+internal actual class ChildCancelledException : CancellationException("Child of the scoped flow was cancelled")
diff --git a/kotlinx-coroutines-core/jvm/src/Builders.kt b/kotlinx-coroutines-core/jvm/src/Builders.kt
index d8f8ee3..52841cd 100644
--- a/kotlinx-coroutines-core/jvm/src/Builders.kt
+++ b/kotlinx-coroutines-core/jvm/src/Builders.kt
@@ -59,8 +59,7 @@
private val blockedThread: Thread,
private val eventLoop: EventLoop?
) : AbstractCoroutine<T>(parentContext, true) {
- override val cancelsParent: Boolean
- get() = false // it throws exception to parent instead of cancelling it
+ override val isScopedCoroutine: Boolean get() = true
override fun afterCompletionInternal(state: Any?, mode: Int) {
// wake up blocked thread
diff --git a/kotlinx-coroutines-core/jvm/src/Exceptions.kt b/kotlinx-coroutines-core/jvm/src/Exceptions.kt
index bc7e92c..7a8f385 100644
--- a/kotlinx-coroutines-core/jvm/src/Exceptions.kt
+++ b/kotlinx-coroutines-core/jvm/src/Exceptions.kt
@@ -80,8 +80,6 @@
(message!!.hashCode() * 31 + job.hashCode()) * 31 + (cause?.hashCode() ?: 0)
}
-internal actual class CoroutinesInternalError actual constructor(message: String, cause: Throwable) : Error(message, cause)
-
@Suppress("NOTHING_TO_INLINE")
internal actual inline fun Throwable.addSuppressedThrowable(other: Throwable) =
- addSuppressed(other)
+ addSuppressed(other)
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/jvm/src/channels/Actor.kt b/kotlinx-coroutines-core/jvm/src/channels/Actor.kt
index ee41a0a..ffabb99 100644
--- a/kotlinx-coroutines-core/jvm/src/channels/Actor.kt
+++ b/kotlinx-coroutines-core/jvm/src/channels/Actor.kt
@@ -127,7 +127,6 @@
channel: Channel<E>,
active: Boolean
) : ChannelCoroutine<E>(parentContext, channel, active), ActorScope<E> {
- override val cancelsParent: Boolean get() = true
override fun onCancelling(cause: Throwable?) {
_channel.cancel(cause?.let {
diff --git a/kotlinx-coroutines-core/jvm/src/flow/internal/AbortFlowException.kt b/kotlinx-coroutines-core/jvm/src/flow/internal/AbortFlowException.kt
deleted file mode 100644
index 7ff34e7..0000000
--- a/kotlinx-coroutines-core/jvm/src/flow/internal/AbortFlowException.kt
+++ /dev/null
@@ -1,11 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.flow.internal
-
-import kotlinx.coroutines.*
-
-internal actual class AbortFlowException : CancellationException("Flow was aborted, no more elements needed") {
- override fun fillInStackTrace(): Throwable = this
-}
diff --git a/kotlinx-coroutines-core/jvm/src/flow/internal/FlowExceptions.kt b/kotlinx-coroutines-core/jvm/src/flow/internal/FlowExceptions.kt
new file mode 100644
index 0000000..d8d4d21
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/src/flow/internal/FlowExceptions.kt
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow.internal
+
+import kotlinx.coroutines.*
+
+internal actual class AbortFlowException : CancellationException("Flow was aborted, no more elements needed") {
+ override fun fillInStackTrace(): Throwable {
+ if (DEBUG) super.fillInStackTrace()
+ return this
+ }
+}
+
+internal actual class ChildCancelledException : CancellationException("Child of the scoped flow was cancelled") {
+ override fun fillInStackTrace(): Throwable {
+ if (DEBUG) super.fillInStackTrace()
+ return this
+ }
+}
diff --git a/kotlinx-coroutines-core/native/src/Builders.kt b/kotlinx-coroutines-core/native/src/Builders.kt
index 82fd81a..0dc90d5 100644
--- a/kotlinx-coroutines-core/native/src/Builders.kt
+++ b/kotlinx-coroutines-core/native/src/Builders.kt
@@ -54,8 +54,7 @@
parentContext: CoroutineContext,
private val eventLoop: EventLoop?
) : AbstractCoroutine<T>(parentContext, true) {
- override val cancelsParent: Boolean
- get() = false // it throws exception to parent instead of cancelling it
+ override val isScopedCoroutine: Boolean get() = true
@Suppress("UNCHECKED_CAST")
fun joinBlocking(): T = memScoped {
diff --git a/kotlinx-coroutines-core/native/src/Exceptions.kt b/kotlinx-coroutines-core/native/src/Exceptions.kt
index 29c3ce5..109b910 100644
--- a/kotlinx-coroutines-core/native/src/Exceptions.kt
+++ b/kotlinx-coroutines-core/native/src/Exceptions.kt
@@ -48,8 +48,6 @@
(message!!.hashCode() * 31 + job.hashCode()) * 31 + (cause?.hashCode() ?: 0)
}
-internal actual class CoroutinesInternalError actual constructor(message: String, cause: Throwable) : Error(message.withCause(cause))
-
@Suppress("FunctionName")
internal fun IllegalStateException(message: String, cause: Throwable?) =
IllegalStateException(message.withCause(cause))
diff --git a/kotlinx-coroutines-core/native/src/flow/internal/AbortFlowException.kt b/kotlinx-coroutines-core/native/src/flow/internal/AbortFlowException.kt
deleted file mode 100644
index d6a9c31..0000000
--- a/kotlinx-coroutines-core/native/src/flow/internal/AbortFlowException.kt
+++ /dev/null
@@ -1,9 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.flow.internal
-
-import kotlinx.coroutines.*
-
-internal actual class AbortFlowException : CancellationException("Flow was aborted, no more elements needed")
diff --git a/kotlinx-coroutines-core/js/src/flow/internal/AbortFlowException.kt b/kotlinx-coroutines-core/native/src/flow/internal/FlowExceptions.kt
similarity index 71%
copy from kotlinx-coroutines-core/js/src/flow/internal/AbortFlowException.kt
copy to kotlinx-coroutines-core/native/src/flow/internal/FlowExceptions.kt
index d6a9c31..4a291ea 100644
--- a/kotlinx-coroutines-core/js/src/flow/internal/AbortFlowException.kt
+++ b/kotlinx-coroutines-core/native/src/flow/internal/FlowExceptions.kt
@@ -7,3 +7,5 @@
import kotlinx.coroutines.*
internal actual class AbortFlowException : CancellationException("Flow was aborted, no more elements needed")
+internal actual class ChildCancelledException : CancellationException("Child of the scoped flow was cancelled")
+