New flow builder: channelFlow (and its alias callbackFlow) and suppleā€¦ (#1214)

New flow builder: channelFlow (and its alias callbackFlow) and supplementary ProducerScope.await method

Rationale:

    * Can be used in different context without breaking context preservation
    * Can be used to build concurrent operators such as merge
    * Can be used to integrate with callbacks
    * Is less error-prone than flowViaChannel because requires explicit await() call

Partially fixes #1210
diff --git a/kotlinx-coroutines-core/common/src/channels/Produce.kt b/kotlinx-coroutines-core/common/src/channels/Produce.kt
index 9d1df1d..8d34265 100644
--- a/kotlinx-coroutines-core/common/src/channels/Produce.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Produce.kt
@@ -26,6 +26,36 @@
 }
 
 /**
+ * Suspends the current coroutine until the channel is either [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel]
+ * and invokes the given [block] before resuming the coroutine.
+ *
+ * Note that when producer channel is cancelled this function resumes with cancellation exception,
+ * so putting the code after calling this function would not lead to its execution in case of cancellation.
+ * That is why this code takes a lambda parameter.
+ *
+ * Example of usage:
+ * ```
+ * val callbackEventsStream = produce {
+ *     val disposable = registerChannelInCallback(channel)
+ *     awaitClose { disposable.dispose() }
+ * }
+ * ```
+ */
+@ExperimentalCoroutinesApi
+public suspend fun <T> ProducerScope<T>.awaitClose(block: () -> Unit = {}) {
+    check(kotlin.coroutines.coroutineContext[Job] === this) { "awaitClose() can be invoke only from the producer context" }
+    try {
+        suspendCancellableCoroutine<Unit> { cont ->
+            invokeOnClose {
+                cont.resume(Unit)
+            }
+        }
+    } finally {
+        block()
+    }
+}
+
+/**
  * Launches new coroutine to produce a stream of values by sending them to a channel
  * and returns a reference to the coroutine as a [ReceiveChannel]. This resulting
  * object can be used to [receive][ReceiveChannel.receive] elements produced by this coroutine.
diff --git a/kotlinx-coroutines-core/common/src/flow/Builders.kt b/kotlinx-coroutines-core/common/src/flow/Builders.kt
index 06a5c00..733bf63 100644
--- a/kotlinx-coroutines-core/common/src/flow/Builders.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Builders.kt
@@ -200,38 +200,14 @@
 }
 
 /**
- * Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel]
- * that is provided to the builder's [block] of code. It allows elements to be
- * produced by the code that is running in a different context, e.g. from a callback-based API.
- *
- * The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator
- * on the resulting flow. The [block] is not suspending deliberately, if you need suspending scope, [flow] builder
- * should be used instead.
- *
- * To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
- * The provided channel can later be used by any external service to communicate with flow and its buffer determines
- * backpressure buffer size or its behaviour (e.g. in case when [Channel.CONFLATED] was used).
- *
- * Example of usage:
- * ```
- * fun flowFrom(api: CallbackBasedApi): Flow<T> = flowViaChannel { channel ->
- *     val callback = object : Callback { // implementation of some callback interface
- *         override fun onNextValue(value: T) {
- *             channel.offer(value) // Note: offer drops value when buffer is full
- *         }
- *         override fun onApiError(cause: Throwable) {
- *             channel.cancel("API Error", CancellationException(cause))
- *         }
- *         override fun onCompleted() = channel.close()
- *     }
- *     api.register(callback)
- *     channel.invokeOnClose {
- *         api.unregister(callback)
- *     }
- * }
- * ```
+ * @suppress
  */
 @FlowPreview
+@Deprecated(
+    message = "Use channelFlow instead",
+    level = DeprecationLevel.WARNING,
+    replaceWith = ReplaceWith("channelFlow(bufferSize, block)")
+)
 public fun <T> flowViaChannel(
     bufferSize: Int = 16,
     @BuilderInference block: CoroutineScope.(channel: SendChannel<T>) -> Unit
@@ -249,3 +225,90 @@
         }
     }
 }
+
+/**
+ * Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel]
+ * that is provided to the builder's [block] of code via [ProducerScope]. It allows elements to be
+ * produced by the code that is running in a different context or running concurrently.
+ * The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator
+ * on the resulting flow.
+ *
+ * This builder ensures thread-safety and context preservation, thus the provided [ProducerScope] can be used concurrently from different contexts.
+ * The resulting flow will complete as soon as [ProducerScope], to artificially prolong it [awaitClose] can be used.
+ * For more detailed example please refer to [callbackFlow] documentation.
+ *
+ * To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
+ * The provided channel can later be used by any external service to communicate with the flow and its buffer determines
+ * backpressure buffer size or its behaviour (e.g. in the case when [Channel.CONFLATED] was used).
+ *
+ * Examples of usage:
+ * ```
+ * fun <T> Flow<T>.merge(other: Flow<T>): Flow<T> = channelFlow {
+ *     launch {
+ *         collect { value -> send(value) }
+ *     }
+ *     other.collect { value -> send(value) }
+ * }
+ *
+ * fun <T> contextualFlow(): Flow<T> = channelFlow {
+ *     launch(Dispatchers.IO) {
+ *         send(computeIoValue())
+ *     }
+ *
+ *     launch(Dispatchers.Default) {
+ *         send(computeCpuValue())
+ *     }
+ * }
+ * ```
+ */
+@FlowPreview
+public fun <T> channelFlow(bufferSize: Int = 16, @BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
+    flow {
+        coroutineScope {
+            val channel = produce(capacity = bufferSize, block = block)
+            channel.consumeEach { value ->
+                emit(value)
+            }
+        }
+    }
+
+/**
+ * Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel]
+ * that is provided to the builder's [block] of code via [ProducerScope]. It allows elements to be
+ * produced by the code that is running in a different context or running concurrently.
+ *
+ * The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator
+ * on the resulting flow.
+ *
+ * This builder ensures thread-safety and context preservation, thus the provided [ProducerScope] can be used from any context,
+ * e.g. from the callback-based API. The flow completes as soon as its scope completes, thus if you are using channel from the
+ * callback-based API, to artificially prolong scope lifetime and avoid memory-leaks related to unregistered resources,
+ * [awaitClose] extension should be used. [awaitClose] argument will be invoked when either flow consumer cancels flow collection
+ * or when callback-based API invokes [SendChannel.close] manually.
+ *
+ * To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
+ * The provided channel can later be used by any external service to communicate with the flow and its buffer determines
+ * backpressure buffer size or its behaviour (e.g. in the case when [Channel.CONFLATED] was used).
+ *
+ * Example of usage:
+ * ```
+ * fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
+ *     val callback = object : Callback { // implementation of some callback interface
+ *         override fun onNextValue(value: T) {
+ *             // Note: offer drops value when buffer is full
+ *             // Channel.UNLIMITED can be used to avoid overfill
+ *             offer(value)
+ *         }
+ *         override fun onApiError(cause: Throwable) {
+ *             cancel(CancellationException("API Error", cause))
+ *         }
+ *         override fun onCompleted() = channel.close()
+ *     }
+ *     api.register(callback)
+ *     // Suspend until either onCompleted or external cancellation are invoked
+ *     await { api.unregister(callback) }
+ * }
+ * ```
+ */
+public inline fun <T> callbackFlow(bufferSize: Int = 16, @BuilderInference crossinline block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
+    channelFlow(bufferSize) { block() }