New flow builder: channelFlow (and its alias callbackFlow) and suppleā¦ (#1214)
New flow builder: channelFlow (and its alias callbackFlow) and supplementary ProducerScope.await method
Rationale:
* Can be used in different context without breaking context preservation
* Can be used to build concurrent operators such as merge
* Can be used to integrate with callbacks
* Is less error-prone than flowViaChannel because requires explicit await() call
Partially fixes #1210
diff --git a/kotlinx-coroutines-core/common/src/channels/Produce.kt b/kotlinx-coroutines-core/common/src/channels/Produce.kt
index 9d1df1d..8d34265 100644
--- a/kotlinx-coroutines-core/common/src/channels/Produce.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Produce.kt
@@ -26,6 +26,36 @@
}
/**
+ * Suspends the current coroutine until the channel is either [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel]
+ * and invokes the given [block] before resuming the coroutine.
+ *
+ * Note that when producer channel is cancelled this function resumes with cancellation exception,
+ * so putting the code after calling this function would not lead to its execution in case of cancellation.
+ * That is why this code takes a lambda parameter.
+ *
+ * Example of usage:
+ * ```
+ * val callbackEventsStream = produce {
+ * val disposable = registerChannelInCallback(channel)
+ * awaitClose { disposable.dispose() }
+ * }
+ * ```
+ */
+@ExperimentalCoroutinesApi
+public suspend fun <T> ProducerScope<T>.awaitClose(block: () -> Unit = {}) {
+ check(kotlin.coroutines.coroutineContext[Job] === this) { "awaitClose() can be invoke only from the producer context" }
+ try {
+ suspendCancellableCoroutine<Unit> { cont ->
+ invokeOnClose {
+ cont.resume(Unit)
+ }
+ }
+ } finally {
+ block()
+ }
+}
+
+/**
* Launches new coroutine to produce a stream of values by sending them to a channel
* and returns a reference to the coroutine as a [ReceiveChannel]. This resulting
* object can be used to [receive][ReceiveChannel.receive] elements produced by this coroutine.