blob: dc3cd43c9393ea51fffe51c579af7248318045d2 [file] [log] [blame]
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import org.junit.*
import kotlin.random.*
class StateFlowStressTest : TestBase() {
private val nSeconds = 3 * stressTestMultiplier
private val state = MutableStateFlow<Long>(0)
private lateinit var pool: ExecutorCoroutineDispatcher
@After
fun tearDown() {
pool.close()
}
fun stress(nEmitters: Int, nCollectors: Int) = runTest {
pool = newFixedThreadPoolContext(nEmitters + nCollectors, "StateFlowStressTest")
val collected = Array(nCollectors) { LongArray(nEmitters) }
val collectors = launch {
repeat(nCollectors) { collector ->
launch(pool) {
val c = collected[collector]
// collect, but abort and collect again after every 1000 values to stress allocation/deallocation
do {
val batchSize = Random.nextInt(1..1000)
var index = 0
val cnt = state.onEach { value ->
val emitter = (value % nEmitters).toInt()
val current = value / nEmitters
// the first value in batch is allowed to repeat, but cannot go back
val ok = if (index++ == 0) current >= c[emitter] else current > c[emitter]
check(ok) {
"Values must be monotonic, but $current is not, " +
"was ${c[emitter]} in collector #$collector from emitter #$emitter"
}
c[emitter] = current
}.take(batchSize).map { 1 }.sum()
} while (cnt == batchSize)
}
}
}
val emitted = LongArray(nEmitters)
val emitters = launch {
repeat(nEmitters) { emitter ->
launch(pool) {
var current = 1L
while (true) {
state.value = current * nEmitters + emitter
emitted[emitter] = current
current++
if (current % 1000 == 0L) yield() // make it cancellable
}
}
}
}
for (second in 1..nSeconds) {
delay(1000)
val cs = collected.map { it.sum() }
println("$second: emitted=${emitted.sum()}, collected=${cs.min()}..${cs.max()}")
}
emitters.cancelAndJoin()
collectors.cancelAndJoin()
// make sure nothing hanged up
require(collected.all { c ->
c.withIndex().all { (emitter, current) -> current > emitted[emitter] / 2 }
})
}
@Test
fun testSingleEmitterAndCollector() = stress(1, 1)
@Test
fun testTenEmittersAndCollectors() = stress(10, 10)
}