Optimize debounce operator allocation pressure by using conflated produce. Previously it was not possible due to not implemented #1235
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt b/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt
index dbd7120..c6ff12f 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/NullSurrogate.kt
@@ -14,3 +14,11 @@
 @JvmField
 @SharedImmutable
 internal val NULL = Symbol("NULL")
+
+/*
+ * Symbol used to indicate that the flow is complete.
+ * It should never leak to the outside world.
+ */
+@JvmField
+@SharedImmutable
+internal val DONE = Symbol("DONE")
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
index f2f1cd9..85b9b07 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
@@ -42,18 +42,21 @@
 public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
     require(timeoutMillis > 0) { "Debounce timeout should be positive" }
     return scopedFlow { downstream ->
-        val values = Channel<Any?>(Channel.CONFLATED) // Actually Any, KT-30796
-        // Channel is not closed deliberately as there is no close with value
-        val collector = async {
-            collect { value -> values.send(value ?: NULL) }
+        // Actually Any, KT-30796
+        val values = produce<Any?>(capacity = Channel.CONFLATED) {
+            collect { value -> send(value ?: NULL) }
         }
-
-        var isDone = false
         var lastValue: Any? = null
-        while (!isDone) {
+        while (lastValue !== DONE) {
             select<Unit> {
-                values.onReceive {
-                    lastValue = it
+                // Should be receiveOrClosed when boxing issues are fixed
+                values.onReceiveOrNull {
+                    if (it == null) {
+                        if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
+                        lastValue = DONE
+                    } else {
+                        lastValue = it
+                    }
                 }
 
                 lastValue?.let { value ->
@@ -63,12 +66,6 @@
                         downstream.emit(NULL.unbox(value))
                     }
                 }
-
-                // Close with value 'idiom'
-                collector.onAwait {
-                    if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
-                    isDone = true
-                }
             }
         }
     }
@@ -98,16 +95,14 @@
             // Actually Any, KT-30796
             collect { value -> send(value ?: NULL) }
         }
-
-        var isDone = false
         var lastValue: Any? = null
         val ticker = fixedPeriodTicker(periodMillis)
-        while (!isDone) {
+        while (lastValue !== DONE) {
             select<Unit> {
                 values.onReceiveOrNull {
                     if (it == null) {
                         ticker.cancel(ChildCancelledException())
-                        isDone = true
+                        lastValue = DONE
                     } else {
                         lastValue = it
                     }