Optimize debounce operator allocation pressure by using conflated produce. Previously it was not possible due to not implemented #1235
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt b/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt
index dbd7120..c6ff12f 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt
@@ -14,3 +14,11 @@
@JvmField
@SharedImmutable
internal val NULL = Symbol("NULL")
+
+/*
+ * Symbol used to indicate that the flow is complete.
+ * It should never leak to the outside world.
+ */
+@JvmField
+@SharedImmutable
+internal val DONE = Symbol("DONE")
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
index f2f1cd9..85b9b07 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
@@ -42,18 +42,21 @@
public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
require(timeoutMillis > 0) { "Debounce timeout should be positive" }
return scopedFlow { downstream ->
- val values = Channel<Any?>(Channel.CONFLATED) // Actually Any, KT-30796
- // Channel is not closed deliberately as there is no close with value
- val collector = async {
- collect { value -> values.send(value ?: NULL) }
+ // Actually Any, KT-30796
+ val values = produce<Any?>(capacity = Channel.CONFLATED) {
+ collect { value -> send(value ?: NULL) }
}
-
- var isDone = false
var lastValue: Any? = null
- while (!isDone) {
+ while (lastValue !== DONE) {
select<Unit> {
- values.onReceive {
- lastValue = it
+ // Should be receiveOrClosed when boxing issues are fixed
+ values.onReceiveOrNull {
+ if (it == null) {
+ if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
+ lastValue = DONE
+ } else {
+ lastValue = it
+ }
}
lastValue?.let { value ->
@@ -63,12 +66,6 @@
downstream.emit(NULL.unbox(value))
}
}
-
- // Close with value 'idiom'
- collector.onAwait {
- if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
- isDone = true
- }
}
}
}
@@ -98,16 +95,14 @@
// Actually Any, KT-30796
collect { value -> send(value ?: NULL) }
}
-
- var isDone = false
var lastValue: Any? = null
val ticker = fixedPeriodTicker(periodMillis)
- while (!isDone) {
+ while (lastValue !== DONE) {
select<Unit> {
values.onReceiveOrNull {
if (it == null) {
ticker.cancel(ChildCancelledException())
- isDone = true
+ lastValue = DONE
} else {
lastValue = it
}