blob: f2fb41a5891252a32e5a9893b349fca73b604b30 [file] [log] [blame]
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import kotlin.random.*
import kotlin.test.*
// A simplified version of StateFlowStressTest
class StateFlowCommonStressTest : TestBase() {
private val state = MutableStateFlow<Long>(0)
@Test
fun testSingleEmitterAndCollector() = runMtTest {
var collected = 0L
val collector = launch(Dispatchers.Default) {
// collect, but abort and collect again after every 1000 values to stress allocation/deallocation
do {
val batchSize = Random.nextInt(1..1000)
var index = 0
val cnt = state.onEach { value ->
// the first value in batch is allowed to repeat, but cannot go back
val ok = if (index++ == 0) value >= collected else value > collected
check(ok) {
"Values must be monotonic, but $value is not, was $collected"
}
collected = value
}.take(batchSize).map { 1 }.sum()
} while (cnt == batchSize)
}
var current = 1L
val emitter = launch {
while (true) {
state.value = current++
if (current % 1000 == 0L) yield() // make it cancellable
}
}
delay(3000)
emitter.cancelAndJoin()
collector.cancelAndJoin()
assertTrue { current >= collected / 2 }
}
}