* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.internal.*
import kotlinx.coroutines.selects.*
import kotlin.jvm.*
import kotlin.time.*
* Returns a flow that mirrors the original flow, but filters out values
* that are followed by the newer values within the given [timeout][timeoutMillis].
* The latest value is always emitted.
* Example:
* ```kotlin
* flow {
* emit(1)
* delay(90)
* emit(2)
* delay(90)
* emit(3)
* delay(1010)
* emit(4)
* delay(1010)
* emit(5)
* }.debounce(1000)
* ```
* produces the following emissions
* ```text
* 3, 4, 5
* ```
* Note that the resulting flow does not emit anything as long as the original flow emits
* items faster than every [timeoutMillis] milliseconds.
public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
require(timeoutMillis >= 0L) { "Debounce timeout should not be negative" }
if (timeoutMillis == 0L) return this
return debounceInternal { timeoutMillis }
* A variation of [debounce] that allows specifying the timeout value dynamically.
* Example:
* ```kotlin
* flow {
* emit(1)
* delay(90)
* emit(2)
* delay(90)
* emit(3)
* delay(1010)
* emit(4)
* delay(1010)
* emit(5)
* }.debounce {
* if (it == 1) {
* 0L
* } else {
* 1000L
* }
* }
* ```
* produces the following emissions
* ```text
* 1, 3, 4, 5
* ```
* @param timeoutMillis [T] is the emitted value and the return value is timeout in milliseconds.
public fun <T> Flow<T>.debounce(timeoutMillis: (T) -> Long): Flow<T> =
* Example:
* ```kotlin
* flow {
* emit(1)
* delay(90.milliseconds)
* emit(2)
* delay(90.milliseconds)
* emit(3)
* delay(1010.milliseconds)
* emit(4)
* delay(1010.milliseconds)
* emit(5)
* }.debounce(1000.milliseconds)
* ```
* produces the following emissions
* ```text
* 3, 4, 5
* ```
public fun <T> Flow<T>.debounce(timeout: Duration): Flow<T> =
* A variation of [debounce] that allows specifying the timeout value dynamically.
* Example:
* ```kotlin
* flow {
* emit(1)
* delay(90.milliseconds)
* emit(2)
* delay(90.milliseconds)
* emit(3)
* delay(1010.milliseconds)
* emit(4)
* delay(1010.milliseconds)
* emit(5)
* }.debounce {
* if (it == 1) {
* 0.milliseconds
* } else {
* 1000.milliseconds
* }
* }
* ```
* produces the following emissions
* ```text
* 1, 3, 4, 5
* ```
* @param timeout [T] is the emitted value and the return value is timeout in [Duration].
public fun <T> Flow<T>.debounce(timeout: (T) -> Duration): Flow<T> =
debounceInternal { emittedItem ->
private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long) : Flow<T> =
scopedFlow { downstream ->
// Produce the values using the default (rendezvous) channel
// Note: the actual type is Any, KT-30796
val values = produce<Any?> {
collect { value -> send(value ?: NULL) }
// Now consume the values
var lastValue: Any? = null
while (lastValue !== DONE) {
var timeoutMillis = 0L // will be always computed when lastValue != null
// Compute timeout for this value
if (lastValue != null) {
timeoutMillis = timeoutMillisSelector(NULL.unbox(lastValue))
require(timeoutMillis >= 0L) { "Debounce timeout should not be negative" }
if (timeoutMillis == 0L) {
lastValue = null // Consume the value
// assert invariant: lastValue != null implies timeoutMillis > 0
assert { lastValue == null || timeoutMillis > 0 }
// wait for the next value with timeout
select<Unit> {
// Set timeout when lastValue exists and is not consumed yet
if (lastValue != null) {
onTimeout(timeoutMillis) {
lastValue = null // Consume the value
// Should be receiveOrClosed when boxing issues are fixed
values.onReceiveOrNull { value ->
if (value == null) {
if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
lastValue = DONE
} else {
lastValue = value
* Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMillis].
* Example:
* ```kotlin
* flow {
* repeat(10) {
* emit(it)
* delay(110)
* }
* }.sample(200)
* ```
* produces the following emissions
* ```text
* 1, 3, 5, 7, 9
* ```
* Note that the latest element is not emitted if it does not fit into the sampling window.
public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T> {
require(periodMillis > 0) { "Sample period should be positive" }
return scopedFlow { downstream ->
val values = produce<Any?>(capacity = Channel.CONFLATED) {
// Actually Any, KT-30796
collect { value -> send(value ?: NULL) }
var lastValue: Any? = null
val ticker = fixedPeriodTicker(periodMillis)
while (lastValue !== DONE) {
select<Unit> {
values.onReceiveOrNull {
if (it == null) {
lastValue = DONE
} else {
lastValue = it
// todo: shall be start sampling only when an element arrives or sample aways as here?
ticker.onReceive {
val value = lastValue ?: return@onReceive
lastValue = null // Consume the value
* TODO this design (and design of the corresponding operator) depends on #540
internal fun CoroutineScope.fixedPeriodTicker(delayMillis: Long, initialDelayMillis: Long = delayMillis): ReceiveChannel<Unit> {
require(delayMillis >= 0) { "Expected non-negative delay, but has $delayMillis ms" }
require(initialDelayMillis >= 0) { "Expected non-negative initial delay, but has $initialDelayMillis ms" }
return produce(capacity = 0) {
while (true) {
* Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period].
* Example:
* ```kotlin
* flow {
* repeat(10) {
* emit(it)
* delay(110.milliseconds)
* }
* }.sample(200.milliseconds)
* ```
* produces the following emissions
* ```text
* 1, 3, 5, 7, 9
* ```
* Note that the latest element is not emitted if it does not fit into the sampling window.
public fun <T> Flow<T>.sample(period: Duration): Flow<T> = sample(period.toDelayMillis())