Optimize combine operator
* Avoid linear complexity for emits
* Reduce bytecode size
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
index f7edad0..8824095 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
@@ -49,28 +49,26 @@
flows: Array<out Flow<T>>,
arrayFactory: () -> Array<T?>,
transform: suspend FlowCollector<R>.(Array<T>) -> Unit
-) {
- coroutineScope {
- val size = flows.size
- val channels =
- Array(size) { asFairChannel(flows[it]) }
- val latestValues = arrayOfNulls<Any?>(size)
- val isClosed = Array(size) { false }
-
- // See flow.combine(other) for explanation.
- while (!isClosed.all { it }) {
- select<Unit> {
- for (i in 0 until size) {
- onReceive(isClosed[i], channels[i], { isClosed[i] = true }) { value ->
- latestValues[i] = value
- if (latestValues.all { it !== null }) {
- val arguments = arrayFactory()
- for (index in 0 until size) {
- arguments[index] = NULL.unbox(latestValues[index])
- }
- transform(arguments as Array<T>)
- }
+): Unit = coroutineScope {
+ val size = flows.size
+ val channels = Array(size) { asFairChannel(flows[it]) }
+ val latestValues = arrayOfNulls<Any?>(size)
+ val isClosed = Array(size) { false }
+ var nonClosed = size
+ var remainingNulls = size
+ // See flow.combine(other) for explanation.
+ while (nonClosed != 0) {
+ select<Unit> {
+ for (i in 0 until size) {
+ onReceive(isClosed[i], channels[i], { isClosed[i] = true; --nonClosed }) { value ->
+ if (latestValues[i] == null) --remainingNulls
+ latestValues[i] = value
+ if (remainingNulls != 0) return@onReceive
+ val arguments = arrayFactory()
+ for (index in 0 until size) {
+ arguments[index] = NULL.unbox(latestValues[index])
}
+ transform(arguments as Array<T>)
}
}
}
@@ -84,6 +82,7 @@
noinline onReceive: suspend (value: Any) -> Unit
) {
if (isClosed) return
+ @Suppress("DEPRECATION")
channel.onReceiveOrNull {
// TODO onReceiveOrClosed when boxing issues are fixed
if (it === null) onClosed()