New flow builder: channelFlow (and its alias callbackFlow) and supple… (#1214)

New flow builder: channelFlow (and its alias callbackFlow) and supplementary ProducerScope.await method

Rationale:

    * Can be used in different context without breaking context preservation
    * Can be used to build concurrent operators such as merge
    * Can be used to integrate with callbacks
    * Is less error-prone than flowViaChannel because requires explicit await() call

Partially fixes #1210
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
index f986b64..dc3180b 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
@@ -718,6 +718,8 @@
 }
 
 public final class kotlinx/coroutines/channels/ProduceKt {
+	public static final fun awaitClose (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+	public static synthetic fun awaitClose$default (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
 	public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel;
 	public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel;
 	public static synthetic fun produce$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
@@ -796,6 +798,10 @@
 	public static final fun asFlow ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun broadcastIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
 	public static synthetic fun broadcastIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel;
+	public static final fun callbackFlow (ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+	public static synthetic fun callbackFlow$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun channelFlow (ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+	public static synthetic fun channelFlow$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow;
diff --git a/kotlinx-coroutines-core/common/src/channels/Produce.kt b/kotlinx-coroutines-core/common/src/channels/Produce.kt
index 9d1df1d..8d34265 100644
--- a/kotlinx-coroutines-core/common/src/channels/Produce.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Produce.kt
@@ -26,6 +26,36 @@
 }
 
 /**
+ * Suspends the current coroutine until the channel is either [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel]
+ * and invokes the given [block] before resuming the coroutine.
+ *
+ * Note that when producer channel is cancelled this function resumes with cancellation exception,
+ * so putting the code after calling this function would not lead to its execution in case of cancellation.
+ * That is why this code takes a lambda parameter.
+ *
+ * Example of usage:
+ * ```
+ * val callbackEventsStream = produce {
+ *     val disposable = registerChannelInCallback(channel)
+ *     awaitClose { disposable.dispose() }
+ * }
+ * ```
+ */
+@ExperimentalCoroutinesApi
+public suspend fun <T> ProducerScope<T>.awaitClose(block: () -> Unit = {}) {
+    check(kotlin.coroutines.coroutineContext[Job] === this) { "awaitClose() can be invoke only from the producer context" }
+    try {
+        suspendCancellableCoroutine<Unit> { cont ->
+            invokeOnClose {
+                cont.resume(Unit)
+            }
+        }
+    } finally {
+        block()
+    }
+}
+
+/**
  * Launches new coroutine to produce a stream of values by sending them to a channel
  * and returns a reference to the coroutine as a [ReceiveChannel]. This resulting
  * object can be used to [receive][ReceiveChannel.receive] elements produced by this coroutine.
diff --git a/kotlinx-coroutines-core/common/src/flow/Builders.kt b/kotlinx-coroutines-core/common/src/flow/Builders.kt
index 06a5c00..733bf63 100644
--- a/kotlinx-coroutines-core/common/src/flow/Builders.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Builders.kt
@@ -200,38 +200,14 @@
 }
 
 /**
- * Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel]
- * that is provided to the builder's [block] of code. It allows elements to be
- * produced by the code that is running in a different context, e.g. from a callback-based API.
- *
- * The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator
- * on the resulting flow. The [block] is not suspending deliberately, if you need suspending scope, [flow] builder
- * should be used instead.
- *
- * To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
- * The provided channel can later be used by any external service to communicate with flow and its buffer determines
- * backpressure buffer size or its behaviour (e.g. in case when [Channel.CONFLATED] was used).
- *
- * Example of usage:
- * ```
- * fun flowFrom(api: CallbackBasedApi): Flow<T> = flowViaChannel { channel ->
- *     val callback = object : Callback { // implementation of some callback interface
- *         override fun onNextValue(value: T) {
- *             channel.offer(value) // Note: offer drops value when buffer is full
- *         }
- *         override fun onApiError(cause: Throwable) {
- *             channel.cancel("API Error", CancellationException(cause))
- *         }
- *         override fun onCompleted() = channel.close()
- *     }
- *     api.register(callback)
- *     channel.invokeOnClose {
- *         api.unregister(callback)
- *     }
- * }
- * ```
+ * @suppress
  */
 @FlowPreview
+@Deprecated(
+    message = "Use channelFlow instead",
+    level = DeprecationLevel.WARNING,
+    replaceWith = ReplaceWith("channelFlow(bufferSize, block)")
+)
 public fun <T> flowViaChannel(
     bufferSize: Int = 16,
     @BuilderInference block: CoroutineScope.(channel: SendChannel<T>) -> Unit
@@ -249,3 +225,90 @@
         }
     }
 }
+
+/**
+ * Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel]
+ * that is provided to the builder's [block] of code via [ProducerScope]. It allows elements to be
+ * produced by the code that is running in a different context or running concurrently.
+ * The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator
+ * on the resulting flow.
+ *
+ * This builder ensures thread-safety and context preservation, thus the provided [ProducerScope] can be used concurrently from different contexts.
+ * The resulting flow will complete as soon as [ProducerScope], to artificially prolong it [awaitClose] can be used.
+ * For more detailed example please refer to [callbackFlow] documentation.
+ *
+ * To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
+ * The provided channel can later be used by any external service to communicate with the flow and its buffer determines
+ * backpressure buffer size or its behaviour (e.g. in the case when [Channel.CONFLATED] was used).
+ *
+ * Examples of usage:
+ * ```
+ * fun <T> Flow<T>.merge(other: Flow<T>): Flow<T> = channelFlow {
+ *     launch {
+ *         collect { value -> send(value) }
+ *     }
+ *     other.collect { value -> send(value) }
+ * }
+ *
+ * fun <T> contextualFlow(): Flow<T> = channelFlow {
+ *     launch(Dispatchers.IO) {
+ *         send(computeIoValue())
+ *     }
+ *
+ *     launch(Dispatchers.Default) {
+ *         send(computeCpuValue())
+ *     }
+ * }
+ * ```
+ */
+@FlowPreview
+public fun <T> channelFlow(bufferSize: Int = 16, @BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
+    flow {
+        coroutineScope {
+            val channel = produce(capacity = bufferSize, block = block)
+            channel.consumeEach { value ->
+                emit(value)
+            }
+        }
+    }
+
+/**
+ * Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel]
+ * that is provided to the builder's [block] of code via [ProducerScope]. It allows elements to be
+ * produced by the code that is running in a different context or running concurrently.
+ *
+ * The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator
+ * on the resulting flow.
+ *
+ * This builder ensures thread-safety and context preservation, thus the provided [ProducerScope] can be used from any context,
+ * e.g. from the callback-based API. The flow completes as soon as its scope completes, thus if you are using channel from the
+ * callback-based API, to artificially prolong scope lifetime and avoid memory-leaks related to unregistered resources,
+ * [awaitClose] extension should be used. [awaitClose] argument will be invoked when either flow consumer cancels flow collection
+ * or when callback-based API invokes [SendChannel.close] manually.
+ *
+ * To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
+ * The provided channel can later be used by any external service to communicate with the flow and its buffer determines
+ * backpressure buffer size or its behaviour (e.g. in the case when [Channel.CONFLATED] was used).
+ *
+ * Example of usage:
+ * ```
+ * fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
+ *     val callback = object : Callback { // implementation of some callback interface
+ *         override fun onNextValue(value: T) {
+ *             // Note: offer drops value when buffer is full
+ *             // Channel.UNLIMITED can be used to avoid overfill
+ *             offer(value)
+ *         }
+ *         override fun onApiError(cause: Throwable) {
+ *             cancel(CancellationException("API Error", cause))
+ *         }
+ *         override fun onCompleted() = channel.close()
+ *     }
+ *     api.register(callback)
+ *     // Suspend until either onCompleted or external cancellation are invoked
+ *     await { api.unregister(callback) }
+ * }
+ * ```
+ */
+public inline fun <T> callbackFlow(bufferSize: Int = 16, @BuilderInference crossinline block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
+    channelFlow(bufferSize) { block() }
diff --git a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt
index 5137dd7..cbaf708 100644
--- a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt
@@ -94,6 +94,60 @@
         cancelOnCompletion(coroutineContext)
     }
 
+    @Test
+    fun testAwaitConsumerCancellation() = runTest {
+        val parent = Job()
+        val channel = produce<Int>(parent) {
+            expect(2)
+            awaitClose { expect(4) }
+        }
+        expect(1)
+        yield()
+        expect(3)
+        channel.cancel()
+        parent.complete()
+        parent.join()
+        finish(5)
+    }
+
+    @Test
+    fun testAwaitProducerCancellation() = runTest {
+        val parent = Job()
+        produce<Int>(parent) {
+            expect(2)
+            launch {
+                expect(3)
+                this@produce.cancel()
+            }
+            awaitClose { expect(4) }
+        }
+        expect(1)
+        parent.complete()
+        parent.join()
+        finish(5)
+    }
+
+    @Test
+    fun testAwaitParentCancellation() = runTest {
+        val parent = Job()
+        produce<Int>(parent) {
+            expect(2)
+            awaitClose { expect(4) }
+        }
+        expect(1)
+        yield()
+        expect(3)
+        parent.cancelAndJoin()
+        finish(5)
+    }
+
+    @Test
+    fun testAwaitIllegalState() = runTest {
+        val channel = produce<Int> {  }
+        @Suppress("RemoveExplicitTypeArguments") // KT-31525
+        assertFailsWith<IllegalStateException> { (channel as ProducerScope<*>).awaitClose<Nothing>() }
+    }
+
     private suspend fun cancelOnCompletion(coroutineContext: CoroutineContext) = CoroutineScope(coroutineContext).apply {
         val source = Channel<Int>()
         expect(1)
diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt
new file mode 100644
index 0000000..3c74b0f
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlin.test.*
+
+class ChannelBuildersFlowTest : TestBase() {
+    @Test
+    fun testBroadcastChannelAsFlow() = runTest {
+        val channel = broadcast {
+           repeat(10) {
+               send(it + 1)
+           }
+        }
+
+        val sum = channel.asFlow().sum()
+        assertEquals(55, sum)
+    }
+
+    @Test
+    fun testExceptionInBroadcast() = runTest {
+        expect(1)
+        val channel = broadcast(NonCancellable) { // otherwise failure will cancel scope as well
+            repeat(10) {
+                send(it + 1)
+            }
+            throw TestException()
+        }
+        assertEquals(15, channel.asFlow().take(5).sum())
+
+        // Workaround for JS bug
+        try {
+            channel.asFlow().collect { /* Do nothing */ }
+            expectUnreached()
+        } catch (e: TestException) {
+            finish(2)
+        }
+    }
+
+    @Test
+    fun testBroadcastChannelAsFlowLimits() = runTest {
+        val channel = BroadcastChannel<Int>(1)
+        val flow = channel.asFlow().map { it * it }.drop(1).take(2)
+
+        var expected = 0
+        launch {
+            assertTrue(channel.offer(1)) // Handed to the coroutine
+            assertTrue(channel.offer(2)) // Buffered
+            assertFalse(channel.offer(3)) // Failed to offer
+            channel.send(3)
+            yield()
+            assertEquals(1, expected)
+            assertTrue(channel.offer(4)) // Handed to the coroutine
+            assertTrue(channel.offer(5)) // Buffered
+            assertFalse(channel.offer(6))  // Failed to offer
+            channel.send(6)
+            assertEquals(2, expected)
+        }
+
+        val sum = flow.sum()
+        assertEquals(13, sum)
+        ++expected
+        val sum2 = flow.sum()
+        assertEquals(61, sum2)
+        ++expected
+    }
+
+    @Test
+    fun flowAsBroadcast() = runTest {
+        val flow = flow {
+            repeat(10) {
+                emit(it)
+            }
+        }
+
+        val channel = flow.broadcastIn(this)
+        assertEquals((0..9).toList(), channel.openSubscription().toList())
+    }
+
+    @Test
+    fun flowAsBroadcastMultipleSubscription() = runTest {
+        val flow = flow {
+            repeat(10) {
+                emit(it)
+            }
+        }
+
+        val broadcast = flow.broadcastIn(this)
+        val channel = broadcast.openSubscription()
+        val channel2 = broadcast.openSubscription()
+
+        assertEquals(0, channel.receive())
+        assertEquals(0, channel2.receive())
+        yield()
+        assertEquals(1, channel.receive())
+        assertEquals(1, channel2.receive())
+
+        channel.cancel()
+        channel2.cancel()
+        yield()
+        ensureActive()
+    }
+
+    @Test
+    fun flowAsBroadcastException() = runTest {
+        val flow = flow {
+            repeat(10) {
+                emit(it)
+            }
+
+            throw TestException()
+        }
+
+        val channel = flow.broadcastIn(this + NonCancellable)
+        assertFailsWith<TestException> { channel.openSubscription().toList() }
+        assertTrue(channel.isClosedForSend) // Failure in the flow fails the channel
+    }
+
+    // Semantics of these tests puzzle me, we should figure out the way to prohibit such chains
+    @Test
+    fun testFlowAsBroadcastAsFlow() = runTest {
+        val flow = flow {
+            emit(1)
+            emit(2)
+            emit(3)
+        }.broadcastIn(this).asFlow()
+
+        assertEquals(6, flow.sum())
+        assertEquals(0, flow.sum()) // Well suddenly flow is no longer idempotent and cold
+    }
+
+    @Test
+    fun testBroadcastAsFlowAsBroadcast() = runTest {
+        val channel = broadcast {
+            send(1)
+        }.asFlow().broadcastIn(this)
+
+        channel.openSubscription().consumeEach {
+            assertEquals(1, it)
+        }
+
+        channel.openSubscription().consumeEach {
+            fail()
+        }
+    }
+}
diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt
index c2051e7..5d0292e 100644
--- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt
@@ -10,141 +10,72 @@
 
 class ChannelFlowTest : TestBase() {
     @Test
-    fun testBroadcastChannelAsFlow() = runTest {
-        val channel = broadcast {
-           repeat(10) {
-               send(it + 1)
-           }
+    fun testRegular() = runTest {
+        val flow = channelFlow {
+            assertTrue(offer(1))
+            assertTrue(offer(2))
+            assertTrue(offer(3))
         }
-
-        val sum = channel.asFlow().sum()
-        assertEquals(55, sum)
+        assertEquals(listOf(1, 2, 3), flow.toList())
     }
 
     @Test
-    fun testExceptionInBroadcast() = runTest {
+    fun testBuffer() = runTest {
+        val flow = channelFlow(bufferSize = 1) {
+            assertTrue(offer(1))
+            assertTrue(offer(2))
+            assertFalse(offer(3))
+        }
+        assertEquals(listOf(1, 2), flow.toList())
+    }
+
+    @Test
+    fun testConflated() = runTest {
+        val flow = channelFlow(bufferSize = Channel.CONFLATED) {
+            assertTrue(offer(1))
+            assertTrue(offer(2))
+        }
+        assertEquals(listOf(1), flow.toList())
+    }
+
+    @Test
+    fun testFailureCancelsChannel() = runTest {
+        val flow = channelFlow {
+            offer(1)
+            invokeOnClose {
+                expect(2)
+            }
+        }.onEach { throw TestException() }
+
         expect(1)
-        val channel = broadcast(NonCancellable) { // otherwise failure will cancel scope as well
-            repeat(10) {
-                send(it + 1)
+        assertFailsWith<TestException>(flow)
+        finish(3)
+    }
+
+    @Test
+    fun testFailureInSourceCancelsConsumer() = runTest {
+        val flow = channelFlow<Int> {
+            expect(2)
+            throw TestException()
+        }.onEach { expectUnreached() }
+
+        expect(1)
+        assertFailsWith<TestException>(flow)
+        finish(3)
+    }
+
+    @Test
+    fun testScopedCancellation() = runTest {
+        val flow = channelFlow<Int> {
+            expect(2)
+            launch(start = CoroutineStart.ATOMIC) {
+                hang { expect(3) }
             }
             throw TestException()
-        }
-        assertEquals(15, channel.asFlow().take(5).sum())
+        }.onEach { expectUnreached() }
 
-        // Workaround for JS bug
-        try {
-            channel.asFlow().collect { /* Do nothing */ }
-            expectUnreached()
-        } catch (e: TestException) {
-            finish(2)
-        }
-    }
-
-    @Test
-    fun testBroadcastChannelAsFlowLimits() = runTest {
-        val channel = BroadcastChannel<Int>(1)
-        val flow = channel.asFlow().map { it * it }.drop(1).take(2)
-
-        var expected = 0
-        launch {
-            assertTrue(channel.offer(1)) // Handed to the coroutine
-            assertTrue(channel.offer(2)) // Buffered
-            assertFalse(channel.offer(3)) // Failed to offer
-            channel.send(3)
-            yield()
-            assertEquals(1, expected)
-            assertTrue(channel.offer(4)) // Handed to the coroutine
-            assertTrue(channel.offer(5)) // Buffered
-            assertFalse(channel.offer(6))  // Failed to offer
-            channel.send(6)
-            assertEquals(2, expected)
-        }
-
-        val sum = flow.sum()
-        assertEquals(13, sum)
-        ++expected
-        val sum2 = flow.sum()
-        assertEquals(61, sum2)
-        ++expected
-    }
-
-    @Test
-    fun flowAsBroadcast() = runTest {
-        val flow = flow {
-            repeat(10) {
-                emit(it)
-            }
-        }
-
-        val channel = flow.broadcastIn(this)
-        assertEquals((0..9).toList(), channel.openSubscription().toList())
-    }
-
-    @Test
-    fun flowAsBroadcastMultipleSubscription() = runTest {
-        val flow = flow {
-            repeat(10) {
-                emit(it)
-            }
-        }
-
-        val broadcast = flow.broadcastIn(this)
-        val channel = broadcast.openSubscription()
-        val channel2 = broadcast.openSubscription()
-
-        assertEquals(0, channel.receive())
-        assertEquals(0, channel2.receive())
-        yield()
-        assertEquals(1, channel.receive())
-        assertEquals(1, channel2.receive())
-
-        channel.cancel()
-        channel2.cancel()
-        yield()
-        ensureActive()
-    }
-
-    @Test
-    fun flowAsBroadcastException() = runTest {
-        val flow = flow {
-            repeat(10) {
-                emit(it)
-            }
-
-            throw TestException()
-        }
-
-        val channel = flow.broadcastIn(this + NonCancellable)
-        assertFailsWith<TestException> { channel.openSubscription().toList() }
-        assertTrue(channel.isClosedForSend) // Failure in the flow fails the channel
-    }
-
-    // Semantics of these tests puzzle me, we should figure out the way to prohibit such chains
-    @Test
-    fun testFlowAsBroadcastAsFlow() = runTest {
-        val flow = flow {
-            emit(1)
-            emit(2)
-            emit(3)
-        }.broadcastIn(this).asFlow()
-
-        assertEquals(6, flow.sum())
-        assertEquals(0, flow.sum()) // Well suddenly flow is no longer idempotent and cold
-    }
-
-    @Test
-    fun testBroadcastAsFlowAsBroadcast() = runTest {
-        val channel = broadcast {
-            send(1)
-        }.asFlow().broadcastIn(this)
-
-        channel.openSubscription().consumeEach {
-            assertEquals(1, it)
-        }
-
-        channel.openSubscription().consumeEach {
-            fail()
-        }
+        expect(1)
+        assertFailsWith<TestException>(flow)
+        finish(4)
     }
 }
diff --git a/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt
new file mode 100644
index 0000000..d992d06
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlin.test.*
+
+class FlowCallbackTest : TestBase() {
+    @Test
+    fun testClosedPrematurely() = runTest(unhandled = listOf({ e -> e is ClosedSendChannelException })) {
+        val outerScope = this
+        val flow = channelFlow {
+            // ~ callback-based API
+            outerScope.launch(Job()) {
+                expect(2)
+                send(1)
+                expectUnreached()
+            }
+            expect(1)
+        }
+        assertEquals(emptyList(), flow.toList())
+        finish(3)
+    }
+
+    @Test
+    fun testNotClosedPrematurely() = runTest {
+        val outerScope = this
+        val flow = channelFlow<Int> {
+            // ~ callback-based API
+            outerScope.launch(Job()) {
+                expect(2)
+                send(1)
+                close()
+            }
+            expect(1)
+            awaitClose()
+        }
+
+        assertEquals(listOf(1), flow.toList())
+        finish(3)
+    }
+}
+
diff --git a/kotlinx-coroutines-core/common/test/flow/channels/FlowViaChannelTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/FlowViaChannelTest.kt
deleted file mode 100644
index 364cd84..0000000
--- a/kotlinx-coroutines-core/common/test/flow/channels/FlowViaChannelTest.kt
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.flow
-
-import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.*
-import kotlin.test.*
-
-class FlowViaChannelTest : TestBase() {
-    @Test
-    fun testRegular() = runTest {
-        val flow = flowViaChannel<Int> {
-            assertTrue(it.offer(1))
-            assertTrue(it.offer(2))
-            assertTrue(it.offer(3))
-            it.close()
-        }
-        assertEquals(listOf(1, 2, 3), flow.toList())
-    }
-
-    @Test
-    fun testBuffer() = runTest {
-        val flow = flowViaChannel<Int>(bufferSize = 1) {
-            assertTrue(it.offer(1))
-            assertTrue(it.offer(2))
-            assertFalse(it.offer(3))
-            it.close()
-        }
-        assertEquals(listOf(1, 2), flow.toList())
-    }
-
-    @Test
-    fun testConflated() = runTest {
-        val flow = flowViaChannel<Int>(bufferSize = Channel.CONFLATED) {
-            assertTrue(it.offer(1))
-            assertTrue(it.offer(2))
-            it.close()
-        }
-        assertEquals(listOf(1), flow.toList())
-    }
-
-    @Test
-    fun testFailureCancelsChannel() = runTest {
-        val flow = flowViaChannel<Int> {
-            it.offer(1)
-            it.invokeOnClose {
-                expect(2)
-            }
-        }.onEach { throw TestException() }
-
-        expect(1)
-        assertFailsWith<TestException>(flow)
-        finish(3)
-    }
-
-    @Test
-    fun testFailureInSourceCancelsConsumer() = runTest {
-        val flow = flowViaChannel<Int> {
-            expect(2)
-            throw TestException()
-        }.onEach { expectUnreached() }
-
-        expect(1)
-        assertFailsWith<TestException>(flow)
-        finish(3)
-    }
-
-    @Test
-    fun testScopedCancellation() = runTest {
-        val flow = flowViaChannel<Int> {
-            expect(2)
-            launch(start = CoroutineStart.ATOMIC) {
-                hang { expect(3) }
-            }
-            throw TestException()
-        }.onEach { expectUnreached() }
-
-        expect(1)
-        assertFailsWith<TestException>(flow)
-        finish(4)
-    }
-}
diff --git a/kotlinx-coroutines-core/jvm/test/flow/FlowFromChannelTest.kt b/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt
similarity index 80%
rename from kotlinx-coroutines-core/jvm/test/flow/FlowFromChannelTest.kt
rename to kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt
index 9d7799c..0a66ae6 100644
--- a/kotlinx-coroutines-core/jvm/test/flow/FlowFromChannelTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt
@@ -10,7 +10,7 @@
 import kotlin.concurrent.*
 import kotlin.test.*
 
-class FlowFromChannelTest : TestBase() {
+class CallbackFlowTest : TestBase() {
 
     private class CallbackApi(val block: (SendChannel<Int>) -> Unit) {
         var started = false
@@ -39,9 +39,9 @@
             runCatching {  it.offer(++i) }
         }
 
-        val flow = flowViaChannel<Int> { channel ->
+        val flow = channelFlow<Int>(16) {
             api.start(channel)
-            channel.invokeOnClose {
+            awaitClose {
                 api.stop()
             }
         }
@@ -83,9 +83,9 @@
             }
         }
 
-        val flow = flowViaChannel<Int> { channel ->
+        val flow = channelFlow<Int> {
             api.start(channel)
-            channel.invokeOnClose {
+            awaitClose {
                 api.stop()
             }
         }
@@ -106,4 +106,22 @@
         assertTrue(api.started)
         assertTrue(api.stopped)
     }
+
+
+    @Test
+    fun testMergeExample() = runTest {
+        // Too slow on JS
+        withContext(Dispatchers.Default) {
+            val f1 = (1..10_000).asFlow()
+            val f2 = (10_001..20_000).asFlow()
+            assertEquals((1..20_000).toSet(), f1.merge(f2).toSet())
+        }
+    }
+
+    private fun Flow<Int>.merge(other: Flow<Int>): Flow<Int> = channelFlow {
+        launch {
+            collect { send(it) }
+        }
+        other.collect { send(it) }
+    }
 }