Atomically start coroutines in intermediate Flow operators in order to ensure proper termination, including finally blocks and onCompletion operators (#1829)

Fixes #1825
diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
index 04a4e99..76a0bed 100644
--- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
+++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
@@ -746,10 +746,10 @@
 public final class kotlinx/coroutines/channels/ProduceKt {
 	public static final fun awaitClose (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 	public static synthetic fun awaitClose$default (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
-	public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel;
 	public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel;
-	public static synthetic fun produce$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
+	public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel;
 	public static synthetic fun produce$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
+	public static synthetic fun produce$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
 }
 
 public abstract interface class kotlinx/coroutines/channels/ProducerScope : kotlinx/coroutines/CoroutineScope, kotlinx/coroutines/channels/SendChannel {
diff --git a/kotlinx-coroutines-core/common/src/channels/Produce.kt b/kotlinx-coroutines-core/common/src/channels/Produce.kt
index 26bd544..24fd399 100644
--- a/kotlinx-coroutines-core/common/src/channels/Produce.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Produce.kt
@@ -115,6 +115,7 @@
 public fun <E> CoroutineScope.produce(
     context: CoroutineContext = EmptyCoroutineContext,
     capacity: Int = 0,
+    start: CoroutineStart = CoroutineStart.DEFAULT,
     onCompletion: CompletionHandler? = null,
     @BuilderInference block: suspend ProducerScope<E>.() -> Unit
 ): ReceiveChannel<E> {
@@ -122,7 +123,7 @@
     val newContext = newCoroutineContext(context)
     val coroutine = ProducerCoroutine(newContext, channel)
     if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
-    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
+    coroutine.start(start, coroutine, block)
     return coroutine
 }
 
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
index 2b62be4..8a18bff 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
@@ -68,8 +68,16 @@
     open fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel<T> =
         scope.broadcast(context, produceCapacity, start, block = collectToFun)
 
+    /**
+     * Here we use ATOMIC start for a reason (#1825).
+     * NB: [produceImpl] is used for [flowOn].
+     * For non-atomic start it is possible to observe the situation,
+     * where the pipeline after the [flowOn] call successfully executes (mostly, its `onCompletion`)
+     * handlers, while the pipeline before does not, because it was cancelled during its dispatch.
+     * Thus `onCompletion` and `finally` blocks won't be executed and it may lead to a different kinds of memory leaks.
+     */
     open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
-        scope.produce(context, produceCapacity, block = collectToFun)
+        scope.produce(context, produceCapacity, start = CoroutineStart.ATOMIC, block = collectToFun)
 
     override suspend fun collect(collector: FlowCollector<T>) =
         coroutineScope {
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt b/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt
index 0ce0c33..acc6ca0 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt
@@ -59,7 +59,7 @@
     val channel = Channel<T>(capacity)
     val newContext = newCoroutineContext(context)
     val coroutine = FlowProduceCoroutine(newContext, channel)
-    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
+    coroutine.start(CoroutineStart.ATOMIC, coroutine, block)
     return coroutine
 }
 
diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt
index 8dd6e3c..f93d039 100644
--- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt
@@ -265,4 +265,15 @@
             fail()
         }
     }
+
+    @Test
+    fun testProduceInAtomicity() = runTest {
+        val flow = flowOf(1).onCompletion { expect(2) }
+        val scope = CoroutineScope(wrapperDispatcher())
+        flow.produceIn(scope)
+        expect(1)
+        scope.cancel()
+        scope.coroutineContext[Job]?.join()
+        finish(3)
+    }
 }
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt
index 511a003..684923c 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt
@@ -38,6 +38,35 @@
     }
 
     @Test
+    fun testAtomicStart() = runTest {
+        try {
+            coroutineScope {
+                val job = coroutineContext[Job]!!
+                val flow = flow {
+                    expect(3)
+                    emit(1)
+                }
+                    .onCompletion { expect(5) }
+                    .flatMapMerge {
+                        expect(4)
+                        flowOf(it).onCompletion { expectUnreached() } }
+                    .onCompletion { expect(6) }
+
+                launch {
+                    expect(1)
+                    flow.collect()
+                }
+                launch {
+                    expect(2)
+                    job.cancel()
+                }
+            }
+        } catch (e: CancellationException) {
+            finish(7)
+        }
+    }
+
+    @Test
     fun testCancellationExceptionDownstream() = runTest {
         val flow = flow {
             emit(1)
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt
index 34c0476..f8350ff 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt
@@ -277,6 +277,33 @@
     }
 
     @Test
+    fun testAtomicStart() = runTest {
+        try {
+            coroutineScope {
+                val job = coroutineContext[Job]!!
+                val flow = flow {
+                    expect(3)
+                    emit(1)
+                }
+                    .onCompletion { expect(4) }
+                    .flowOn(wrapperDispatcher())
+                    .onCompletion { expect(5) }
+
+                launch {
+                    expect(1)
+                    flow.collect()
+                }
+                launch {
+                    expect(2)
+                    job.cancel()
+                }
+            }
+        } catch (e: CancellationException) {
+            finish(6)
+        }
+    }
+
+    @Test
     fun testException() = runTest {
         val flow = flow {
             emit(314)