Atomically start coroutines in intermediate Flow operators in order to ensure proper termination, including finally blocks and onCompletion operators (#1829)
Fixes #1825
diff --git a/kotlinx-coroutines-core/common/src/channels/Produce.kt b/kotlinx-coroutines-core/common/src/channels/Produce.kt
index 26bd544..24fd399 100644
--- a/kotlinx-coroutines-core/common/src/channels/Produce.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Produce.kt
@@ -115,6 +115,7 @@
public fun <E> CoroutineScope.produce(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
+ start: CoroutineStart = CoroutineStart.DEFAULT,
onCompletion: CompletionHandler? = null,
@BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> {
@@ -122,7 +123,7 @@
val newContext = newCoroutineContext(context)
val coroutine = ProducerCoroutine(newContext, channel)
if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
- coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
+ coroutine.start(start, coroutine, block)
return coroutine
}