New flow builder: channelFlow (and its alias callbackFlow) and suppleā¦ (#1214)
New flow builder: channelFlow (and its alias callbackFlow) and supplementary ProducerScope.await method
Rationale:
* Can be used in different context without breaking context preservation
* Can be used to build concurrent operators such as merge
* Can be used to integrate with callbacks
* Is less error-prone than flowViaChannel because requires explicit await() call
Partially fixes #1210
diff --git a/kotlinx-coroutines-core/common/src/channels/Produce.kt b/kotlinx-coroutines-core/common/src/channels/Produce.kt
index 9d1df1d..8d34265 100644
--- a/kotlinx-coroutines-core/common/src/channels/Produce.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Produce.kt
@@ -26,6 +26,36 @@
}
/**
+ * Suspends the current coroutine until the channel is either [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel]
+ * and invokes the given [block] before resuming the coroutine.
+ *
+ * Note that when producer channel is cancelled this function resumes with cancellation exception,
+ * so putting the code after calling this function would not lead to its execution in case of cancellation.
+ * That is why this code takes a lambda parameter.
+ *
+ * Example of usage:
+ * ```
+ * val callbackEventsStream = produce {
+ * val disposable = registerChannelInCallback(channel)
+ * awaitClose { disposable.dispose() }
+ * }
+ * ```
+ */
+@ExperimentalCoroutinesApi
+public suspend fun <T> ProducerScope<T>.awaitClose(block: () -> Unit = {}) {
+ check(kotlin.coroutines.coroutineContext[Job] === this) { "awaitClose() can be invoke only from the producer context" }
+ try {
+ suspendCancellableCoroutine<Unit> { cont ->
+ invokeOnClose {
+ cont.resume(Unit)
+ }
+ }
+ } finally {
+ block()
+ }
+}
+
+/**
* Launches new coroutine to produce a stream of values by sending them to a channel
* and returns a reference to the coroutine as a [ReceiveChannel]. This resulting
* object can be used to [receive][ReceiveChannel.receive] elements produced by this coroutine.
diff --git a/kotlinx-coroutines-core/common/src/flow/Builders.kt b/kotlinx-coroutines-core/common/src/flow/Builders.kt
index 06a5c00..733bf63 100644
--- a/kotlinx-coroutines-core/common/src/flow/Builders.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Builders.kt
@@ -200,38 +200,14 @@
}
/**
- * Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel]
- * that is provided to the builder's [block] of code. It allows elements to be
- * produced by the code that is running in a different context, e.g. from a callback-based API.
- *
- * The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator
- * on the resulting flow. The [block] is not suspending deliberately, if you need suspending scope, [flow] builder
- * should be used instead.
- *
- * To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
- * The provided channel can later be used by any external service to communicate with flow and its buffer determines
- * backpressure buffer size or its behaviour (e.g. in case when [Channel.CONFLATED] was used).
- *
- * Example of usage:
- * ```
- * fun flowFrom(api: CallbackBasedApi): Flow<T> = flowViaChannel { channel ->
- * val callback = object : Callback { // implementation of some callback interface
- * override fun onNextValue(value: T) {
- * channel.offer(value) // Note: offer drops value when buffer is full
- * }
- * override fun onApiError(cause: Throwable) {
- * channel.cancel("API Error", CancellationException(cause))
- * }
- * override fun onCompleted() = channel.close()
- * }
- * api.register(callback)
- * channel.invokeOnClose {
- * api.unregister(callback)
- * }
- * }
- * ```
+ * @suppress
*/
@FlowPreview
+@Deprecated(
+ message = "Use channelFlow instead",
+ level = DeprecationLevel.WARNING,
+ replaceWith = ReplaceWith("channelFlow(bufferSize, block)")
+)
public fun <T> flowViaChannel(
bufferSize: Int = 16,
@BuilderInference block: CoroutineScope.(channel: SendChannel<T>) -> Unit
@@ -249,3 +225,90 @@
}
}
}
+
+/**
+ * Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel]
+ * that is provided to the builder's [block] of code via [ProducerScope]. It allows elements to be
+ * produced by the code that is running in a different context or running concurrently.
+ * The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator
+ * on the resulting flow.
+ *
+ * This builder ensures thread-safety and context preservation, thus the provided [ProducerScope] can be used concurrently from different contexts.
+ * The resulting flow will complete as soon as [ProducerScope], to artificially prolong it [awaitClose] can be used.
+ * For more detailed example please refer to [callbackFlow] documentation.
+ *
+ * To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
+ * The provided channel can later be used by any external service to communicate with the flow and its buffer determines
+ * backpressure buffer size or its behaviour (e.g. in the case when [Channel.CONFLATED] was used).
+ *
+ * Examples of usage:
+ * ```
+ * fun <T> Flow<T>.merge(other: Flow<T>): Flow<T> = channelFlow {
+ * launch {
+ * collect { value -> send(value) }
+ * }
+ * other.collect { value -> send(value) }
+ * }
+ *
+ * fun <T> contextualFlow(): Flow<T> = channelFlow {
+ * launch(Dispatchers.IO) {
+ * send(computeIoValue())
+ * }
+ *
+ * launch(Dispatchers.Default) {
+ * send(computeCpuValue())
+ * }
+ * }
+ * ```
+ */
+@FlowPreview
+public fun <T> channelFlow(bufferSize: Int = 16, @BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
+ flow {
+ coroutineScope {
+ val channel = produce(capacity = bufferSize, block = block)
+ channel.consumeEach { value ->
+ emit(value)
+ }
+ }
+ }
+
+/**
+ * Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel]
+ * that is provided to the builder's [block] of code via [ProducerScope]. It allows elements to be
+ * produced by the code that is running in a different context or running concurrently.
+ *
+ * The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator
+ * on the resulting flow.
+ *
+ * This builder ensures thread-safety and context preservation, thus the provided [ProducerScope] can be used from any context,
+ * e.g. from the callback-based API. The flow completes as soon as its scope completes, thus if you are using channel from the
+ * callback-based API, to artificially prolong scope lifetime and avoid memory-leaks related to unregistered resources,
+ * [awaitClose] extension should be used. [awaitClose] argument will be invoked when either flow consumer cancels flow collection
+ * or when callback-based API invokes [SendChannel.close] manually.
+ *
+ * To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
+ * The provided channel can later be used by any external service to communicate with the flow and its buffer determines
+ * backpressure buffer size or its behaviour (e.g. in the case when [Channel.CONFLATED] was used).
+ *
+ * Example of usage:
+ * ```
+ * fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
+ * val callback = object : Callback { // implementation of some callback interface
+ * override fun onNextValue(value: T) {
+ * // Note: offer drops value when buffer is full
+ * // Channel.UNLIMITED can be used to avoid overfill
+ * offer(value)
+ * }
+ * override fun onApiError(cause: Throwable) {
+ * cancel(CancellationException("API Error", cause))
+ * }
+ * override fun onCompleted() = channel.close()
+ * }
+ * api.register(callback)
+ * // Suspend until either onCompleted or external cancellation are invoked
+ * await { api.unregister(callback) }
+ * }
+ * ```
+ */
+public inline fun <T> callbackFlow(bufferSize: Int = 16, @BuilderInference crossinline block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
+ channelFlow(bufferSize) { block() }