Atomically start coroutines in intermediate Flow operators in order to ensure proper termination, including finally blocks and onCompletion operators (#1829)
Fixes #1825
diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
index 04a4e99..76a0bed 100644
--- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
+++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
@@ -746,10 +746,10 @@
public final class kotlinx/coroutines/channels/ProduceKt {
public static final fun awaitClose (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun awaitClose$default (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
- public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel;
- public static synthetic fun produce$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
+ public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static synthetic fun produce$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
+ public static synthetic fun produce$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
}
public abstract interface class kotlinx/coroutines/channels/ProducerScope : kotlinx/coroutines/CoroutineScope, kotlinx/coroutines/channels/SendChannel {
diff --git a/kotlinx-coroutines-core/common/src/channels/Produce.kt b/kotlinx-coroutines-core/common/src/channels/Produce.kt
index 26bd544..24fd399 100644
--- a/kotlinx-coroutines-core/common/src/channels/Produce.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Produce.kt
@@ -115,6 +115,7 @@
public fun <E> CoroutineScope.produce(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
+ start: CoroutineStart = CoroutineStart.DEFAULT,
onCompletion: CompletionHandler? = null,
@BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> {
@@ -122,7 +123,7 @@
val newContext = newCoroutineContext(context)
val coroutine = ProducerCoroutine(newContext, channel)
if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
- coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
+ coroutine.start(start, coroutine, block)
return coroutine
}
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
index 2b62be4..8a18bff 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
@@ -68,8 +68,16 @@
open fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel<T> =
scope.broadcast(context, produceCapacity, start, block = collectToFun)
+ /**
+ * Here we use ATOMIC start for a reason (#1825).
+ * NB: [produceImpl] is used for [flowOn].
+ * For non-atomic start it is possible to observe the situation,
+ * where the pipeline after the [flowOn] call successfully executes (mostly, its `onCompletion`)
+ * handlers, while the pipeline before does not, because it was cancelled during its dispatch.
+ * Thus `onCompletion` and `finally` blocks won't be executed and it may lead to a different kinds of memory leaks.
+ */
open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
- scope.produce(context, produceCapacity, block = collectToFun)
+ scope.produce(context, produceCapacity, start = CoroutineStart.ATOMIC, block = collectToFun)
override suspend fun collect(collector: FlowCollector<T>) =
coroutineScope {
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt b/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt
index 0ce0c33..acc6ca0 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt
@@ -59,7 +59,7 @@
val channel = Channel<T>(capacity)
val newContext = newCoroutineContext(context)
val coroutine = FlowProduceCoroutine(newContext, channel)
- coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
+ coroutine.start(CoroutineStart.ATOMIC, coroutine, block)
return coroutine
}
diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt
index 8dd6e3c..f93d039 100644
--- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt
@@ -265,4 +265,15 @@
fail()
}
}
+
+ @Test
+ fun testProduceInAtomicity() = runTest {
+ val flow = flowOf(1).onCompletion { expect(2) }
+ val scope = CoroutineScope(wrapperDispatcher())
+ flow.produceIn(scope)
+ expect(1)
+ scope.cancel()
+ scope.coroutineContext[Job]?.join()
+ finish(3)
+ }
}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt
index 511a003..684923c 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt
@@ -38,6 +38,35 @@
}
@Test
+ fun testAtomicStart() = runTest {
+ try {
+ coroutineScope {
+ val job = coroutineContext[Job]!!
+ val flow = flow {
+ expect(3)
+ emit(1)
+ }
+ .onCompletion { expect(5) }
+ .flatMapMerge {
+ expect(4)
+ flowOf(it).onCompletion { expectUnreached() } }
+ .onCompletion { expect(6) }
+
+ launch {
+ expect(1)
+ flow.collect()
+ }
+ launch {
+ expect(2)
+ job.cancel()
+ }
+ }
+ } catch (e: CancellationException) {
+ finish(7)
+ }
+ }
+
+ @Test
fun testCancellationExceptionDownstream() = runTest {
val flow = flow {
emit(1)
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt
index 34c0476..f8350ff 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt
@@ -277,6 +277,33 @@
}
@Test
+ fun testAtomicStart() = runTest {
+ try {
+ coroutineScope {
+ val job = coroutineContext[Job]!!
+ val flow = flow {
+ expect(3)
+ emit(1)
+ }
+ .onCompletion { expect(4) }
+ .flowOn(wrapperDispatcher())
+ .onCompletion { expect(5) }
+
+ launch {
+ expect(1)
+ flow.collect()
+ }
+ launch {
+ expect(2)
+ job.cancel()
+ }
+ }
+ } catch (e: CancellationException) {
+ finish(6)
+ }
+ }
+
+ @Test
fun testException() = runTest {
val flow = flow {
emit(314)