blob: 6e18b38f55f0486c3c2c270194cbf75a61039531 [file] [log] [blame]
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.random.*
import kotlin.test.*
/**
* This test suite contains some basic tests for [SharedFlow]. There are some scenarios here written
* using [expect] and they are not very readable. See [SharedFlowScenarioTest] for a better
* behavioral test-suit.
*/
class SharedFlowTest : TestBase() {
@Test
fun testRendezvousSharedFlowBasic() = runTest {
expect(1)
val sh = MutableSharedFlow<Int?>()
assertTrue(sh.replayCache.isEmpty())
assertEquals(0, sh.subscriptionCount.value)
sh.emit(1) // no suspend
assertTrue(sh.replayCache.isEmpty())
assertEquals(0, sh.subscriptionCount.value)
expect(2)
// one collector
val job1 = launch(start = CoroutineStart.UNDISPATCHED) {
expect(3)
sh.collect {
when(it) {
4 -> expect(5)
6 -> expect(7)
10 -> expect(11)
13 -> expect(14)
else -> expectUnreached()
}
}
expectUnreached() // does not complete normally
}
expect(4)
assertEquals(1, sh.subscriptionCount.value)
sh.emit(4)
assertTrue(sh.replayCache.isEmpty())
expect(6)
sh.emit(6)
expect(8)
// one more collector
val job2 = launch(start = CoroutineStart.UNDISPATCHED) {
expect(9)
sh.collect {
when(it) {
10 -> expect(12)
13 -> expect(15)
17 -> expect(18)
null -> expect(20)
21 -> expect(22)
else -> expectUnreached()
}
}
expectUnreached() // does not complete normally
}
expect(10)
assertEquals(2, sh.subscriptionCount.value)
sh.emit(10) // to both collectors now!
assertTrue(sh.replayCache.isEmpty())
expect(13)
sh.emit(13)
expect(16)
job1.cancel() // cancel the first collector
yield()
assertEquals(1, sh.subscriptionCount.value)
expect(17)
sh.emit(17) // only to second collector
expect(19)
sh.emit(null) // emit null to the second collector
expect(21)
sh.emit(21) // non-null again
expect(23)
job2.cancel() // cancel the second collector
yield()
assertEquals(0, sh.subscriptionCount.value)
expect(24)
sh.emit(24) // does not go anywhere
assertEquals(0, sh.subscriptionCount.value)
assertTrue(sh.replayCache.isEmpty())
finish(25)
}
@Test
fun testRendezvousSharedFlowReset() = runTest {
expect(1)
val sh = MutableSharedFlow<Int>()
val barrier = Channel<Unit>(1)
val job = launch(start = CoroutineStart.UNDISPATCHED) {
expect(2)
sh.collect {
when (it) {
3 -> {
expect(4)
barrier.receive() // hold on before collecting next one
}
6 -> expect(10)
else -> expectUnreached()
}
}
expectUnreached() // does not complete normally
}
expect(3)
sh.emit(3) // rendezvous
expect(5)
assertFalse(sh.tryEmit(5)) // collector is not ready now
launch(start = CoroutineStart.UNDISPATCHED) {
expect(6)
sh.emit(6) // suspends
expect(12)
}
expect(7)
yield() // no wakeup -> all suspended
expect(8)
// now reset cache -> nothing happens, there is no cache
sh.resetReplayCache()
yield()
expect(9)
// now resume collector
barrier.send(Unit)
yield() // to collector
expect(11)
yield() // to emitter
expect(13)
assertFalse(sh.tryEmit(13)) // rendezvous does not work this way
job.cancel()
finish(14)
}
@Test
fun testReplay1SharedFlowBasic() = runTest {
expect(1)
val sh = MutableSharedFlow<Int?>(1)
assertTrue(sh.replayCache.isEmpty())
assertEquals(0, sh.subscriptionCount.value)
sh.emit(1) // no suspend
assertEquals(listOf(1), sh.replayCache)
assertEquals(0, sh.subscriptionCount.value)
expect(2)
sh.emit(2) // no suspend
assertEquals(listOf(2), sh.replayCache)
expect(3)
// one collector
val job1 = launch(start = CoroutineStart.UNDISPATCHED) {
expect(4)
sh.collect {
when(it) {
2 -> expect(5) // got it immediately from replay cache
6 -> expect(8)
null -> expect(14)
17 -> expect(18)
else -> expectUnreached()
}
}
expectUnreached() // does not complete normally
}
expect(6)
assertEquals(1, sh.subscriptionCount.value)
sh.emit(6) // does not suspend, but buffers
assertEquals(listOf(6), sh.replayCache)
expect(7)
yield()
expect(9)
// one more collector
val job2 = launch(start = CoroutineStart.UNDISPATCHED) {
expect(10)
sh.collect {
when(it) {
6 -> expect(11) // from replay cache
null -> expect(15)
else -> expectUnreached()
}
}
expectUnreached() // does not complete normally
}
expect(12)
assertEquals(2, sh.subscriptionCount.value)
sh.emit(null)
expect(13)
assertEquals(listOf(null), sh.replayCache)
yield()
assertEquals(listOf(null), sh.replayCache)
expect(16)
job2.cancel()
yield()
assertEquals(1, sh.subscriptionCount.value)
expect(17)
sh.emit(17)
assertEquals(listOf(17), sh.replayCache)
yield()
expect(19)
job1.cancel()
yield()
assertEquals(0, sh.subscriptionCount.value)
assertEquals(listOf(17), sh.replayCache)
finish(20)
}
@Test
fun testReplay1() = runTest {
expect(1)
val sh = MutableSharedFlow<Int>(1)
assertEquals(listOf(), sh.replayCache)
val barrier = Channel<Unit>(1)
val job = launch(start = CoroutineStart.UNDISPATCHED) {
expect(2)
sh.collect {
when (it) {
3 -> {
expect(4)
barrier.receive() // collector waits
}
5 -> expect(10)
6 -> expect(11)
else -> expectUnreached()
}
}
expectUnreached() // does not complete normally
}
expect(3)
assertTrue(sh.tryEmit(3)) // buffered
assertEquals(listOf(3), sh.replayCache)
yield() // to collector
expect(5)
assertTrue(sh.tryEmit(5)) // buffered
assertEquals(listOf(5), sh.replayCache)
launch(start = CoroutineStart.UNDISPATCHED) {
expect(6)
sh.emit(6) // buffer full, suspended
expect(13)
}
expect(7)
assertEquals(listOf(5), sh.replayCache)
sh.resetReplayCache() // clear cache
assertEquals(listOf(), sh.replayCache)
expect(8)
yield() // emitter still suspended
expect(9)
assertEquals(listOf(), sh.replayCache)
assertFalse(sh.tryEmit(10)) // still no buffer space
assertEquals(listOf(), sh.replayCache)
barrier.send(Unit) // resume collector
yield() // to collector
expect(12)
yield() // to emitter, that should have resumed
expect(14)
job.cancel()
assertEquals(listOf(6), sh.replayCache)
finish(15)
}
@Test
fun testReplay2Extra1() = runTest {
expect(1)
val sh = MutableSharedFlow<Int>(
replay = 2,
extraBufferCapacity = 1
)
assertEquals(listOf(), sh.replayCache)
assertTrue(sh.tryEmit(0))
assertEquals(listOf(0), sh.replayCache)
val job = launch(start = CoroutineStart.UNDISPATCHED) {
expect(2)
var cnt = 0
sh.collect {
when (it) {
0 -> when (cnt++) {
0 -> expect(3)
1 -> expect(14)
else -> expectUnreached()
}
1 -> expect(6)
2 -> expect(7)
3 -> expect(8)
4 -> expect(12)
5 -> expect(13)
16 -> expect(17)
else -> expectUnreached()
}
}
expectUnreached() // does not complete normally
}
expect(4)
assertTrue(sh.tryEmit(1)) // buffered
assertEquals(listOf(0, 1), sh.replayCache)
assertTrue(sh.tryEmit(2)) // buffered
assertEquals(listOf(1, 2), sh.replayCache)
assertTrue(sh.tryEmit(3)) // buffered (buffer size is 3)
assertEquals(listOf(2, 3), sh.replayCache)
expect(5)
yield() // to collector
expect(9)
assertEquals(listOf(2, 3), sh.replayCache)
assertTrue(sh.tryEmit(4)) // can buffer now
assertEquals(listOf(3, 4), sh.replayCache)
assertTrue(sh.tryEmit(5)) // can buffer now
assertEquals(listOf(4, 5), sh.replayCache)
assertTrue(sh.tryEmit(0)) // can buffer one more, let it be zero again
assertEquals(listOf(5, 0), sh.replayCache)
expect(10)
assertFalse(sh.tryEmit(10)) // cannot buffer anymore!
sh.resetReplayCache() // replay cache
assertEquals(listOf(), sh.replayCache) // empty
assertFalse(sh.tryEmit(0)) // still cannot buffer anymore (reset does not help)
assertEquals(listOf(), sh.replayCache) // empty
expect(11)
yield() // resume collector, will get next values
expect(15)
sh.resetReplayCache() // reset again, nothing happens
assertEquals(listOf(), sh.replayCache) // empty
yield() // collector gets nothing -- no change
expect(16)
assertTrue(sh.tryEmit(16))
assertEquals(listOf(16), sh.replayCache)
yield() // gets it
expect(18)
job.cancel()
finish(19)
}
@Test
fun testBufferNoReplayCancelWhileBuffering() = runTest {
val n = 123
val sh = MutableSharedFlow<Int>(replay = 0, extraBufferCapacity = n)
repeat(3) {
val m = n / 2 // collect half, then suspend
val barrier = Channel<Int>(1)
val collectorJob = sh
.onSubscription {
barrier.send(1)
}
.onEach { value ->
if (value == m) {
barrier.send(2)
delay(Long.MAX_VALUE)
}
}
.launchIn(this)
assertEquals(1, barrier.receive()) // make sure it subscribes
launch(start = CoroutineStart.UNDISPATCHED) {
for (i in 0 until n + m) sh.emit(i) // these emits should go Ok
barrier.send(3)
sh.emit(n + 4) // this emit will suspend on buffer overflow
barrier.send(4)
}
assertEquals(2, barrier.receive()) // wait until m collected
assertEquals(3, barrier.receive()) // wait until all are emitted
collectorJob.cancel() // cancelling collector job must clear buffer and resume emitter
assertEquals(4, barrier.receive()) // verify that emitter resumes
}
}
@Test
fun testRepeatedResetWithReplay() = runTest {
val n = 10
val sh = MutableSharedFlow<Int>(n)
var i = 0
repeat(3) {
// collector is slow
val collector = sh.onEach { delay(Long.MAX_VALUE) }.launchIn(this)
val emitter = launch {
repeat(3 * n) { sh.emit(i); i++ }
}
repeat(3) { yield() } // enough to run it to suspension
assertEquals((i - n until i).toList(), sh.replayCache)
sh.resetReplayCache()
assertEquals(emptyList(), sh.replayCache)
repeat(3) { yield() } // enough to run it to suspension
assertEquals(emptyList(), sh.replayCache) // still blocked
collector.cancel()
emitter.cancel()
repeat(3) { yield() } // enough to run it to suspension
}
}
@Test
fun testSynchronousSharedFlowEmitterCancel() = runTest {
expect(1)
val sh = MutableSharedFlow<Int>()
val barrier1 = Job()
val barrier2 = Job()
val barrier3 = Job()
val collector1 = sh.onEach {
when (it) {
1 -> expect(3)
2 -> {
expect(6)
barrier2.complete()
}
3 -> {
expect(9)
barrier3.complete()
}
else -> expectUnreached()
}
}.launchIn(this)
val collector2 = sh.onEach {
when (it) {
1 -> {
expect(4)
barrier1.complete()
delay(Long.MAX_VALUE)
}
else -> expectUnreached()
}
}.launchIn(this)
repeat(2) { yield() } // launch both subscribers
val emitter = launch(start = CoroutineStart.UNDISPATCHED) {
expect(2)
sh.emit(1)
barrier1.join()
expect(5)
sh.emit(2) // suspends because of slow collector2
expectUnreached() // will be cancelled
}
barrier2.join() // wait
expect(7)
// Now cancel the emitter!
emitter.cancel()
yield()
// Cancel slow collector
collector2.cancel()
yield()
// emit to fast collector1
expect(8)
sh.emit(3)
barrier3.join()
expect(10)
// cancel it, too
collector1.cancel()
finish(11)
}
@Test
fun testDifferentBufferedFlowCapacities() = runTest {
if (isBoundByJsTestTimeout) return@runTest // Too slow for JS, bounded by 2 sec. default JS timeout
for (replay in 0..10) {
for (extraBufferCapacity in 0..5) {
if (replay == 0 && extraBufferCapacity == 0) continue // test only buffered shared flows
try {
val sh = MutableSharedFlow<Int>(replay, extraBufferCapacity)
// repeat the whole test a few times to make sure it works correctly when slots are reused
repeat(3) {
testBufferedFlow(sh, replay)
}
} catch (e: Throwable) {
error("Failed for replay=$replay, extraBufferCapacity=$extraBufferCapacity", e)
}
}
}
}
private suspend fun testBufferedFlow(sh: MutableSharedFlow<Int>, replay: Int) = withContext(Job()) {
reset()
expect(1)
val n = 100 // initially emitted to fill buffer
for (i in 1..n) assertTrue(sh.tryEmit(i))
// initial expected replayCache
val rcStart = n - replay + 1
val rcRange = rcStart..n
val rcSize = n - rcStart + 1
assertEquals(rcRange.toList(), sh.replayCache)
// create collectors
val m = 10 // collectors created
var ofs = 0
val k = 42 // emissions to collectors
val ecRange = n + 1..n + k
val jobs = List(m) { jobIndex ->
launch(start = CoroutineStart.UNDISPATCHED) {
sh.collect { i ->
when (i) {
in rcRange -> expect(2 + i - rcStart + jobIndex * rcSize)
in ecRange -> expect(2 + ofs + jobIndex)
else -> expectUnreached()
}
}
expectUnreached() // does not complete normally
}
}
ofs = rcSize * m + 2
expect(ofs)
// emit to all k times
for (p in ecRange) {
sh.emit(p)
expect(1 + ofs) // buffered, no suspend
yield()
ofs += 2 + m
expect(ofs)
}
assertEquals(ecRange.toList().takeLast(replay), sh.replayCache)
// cancel all collectors
jobs.forEach { it.cancel() }
yield()
// replay cache is still there
assertEquals(ecRange.toList().takeLast(replay), sh.replayCache)
finish(1 + ofs)
}
@Test
fun testDropLatest() = testDropLatestOrOldest(BufferOverflow.DROP_LATEST)
@Test
fun testDropOldest() = testDropLatestOrOldest(BufferOverflow.DROP_OLDEST)
private fun testDropLatestOrOldest(bufferOverflow: BufferOverflow) = runTest {
reset()
expect(1)
val sh = MutableSharedFlow<Int?>(1, onBufferOverflow = bufferOverflow)
sh.emit(1)
sh.emit(2)
// always keeps last w/o collectors
assertEquals(listOf(2), sh.replayCache)
assertEquals(0, sh.subscriptionCount.value)
// one collector
val valueAfterOverflow = when (bufferOverflow) {
BufferOverflow.DROP_OLDEST -> 5
BufferOverflow.DROP_LATEST -> 4
else -> error("not supported in this test: $bufferOverflow")
}
val job = launch(start = CoroutineStart.UNDISPATCHED) {
expect(2)
sh.collect {
when(it) {
2 -> { // replayed
expect(3)
yield() // and suspends, busy waiting
}
valueAfterOverflow -> expect(7)
8 -> expect(9)
else -> expectUnreached()
}
}
expectUnreached() // does not complete normally
}
expect(4)
assertEquals(1, sh.subscriptionCount.value)
assertEquals(listOf(2), sh.replayCache)
sh.emit(4) // buffering, collector is busy
assertEquals(listOf(4), sh.replayCache)
expect(5)
sh.emit(5) // Buffer overflow here, will not suspend
assertEquals(listOf(valueAfterOverflow), sh.replayCache)
expect(6)
yield() // to the job
expect(8)
sh.emit(8) // not busy now
assertEquals(listOf(8), sh.replayCache) // buffered
yield() // to process
expect(10)
job.cancel() // cancel the job
yield()
assertEquals(0, sh.subscriptionCount.value)
finish(11)
}
@Test
public fun testOnSubscription() = runTest {
expect(1)
val sh = MutableSharedFlow<String>()
fun share(s: String) { launch(start = CoroutineStart.UNDISPATCHED) { sh.emit(s) } }
sh
.onSubscription {
emit("collector->A")
share("share->A")
}
.onSubscription {
emit("collector->B")
share("share->B")
}
.onStart {
emit("collector->C")
share("share->C") // get's lost, no subscribers yet
}
.onStart {
emit("collector->D")
share("share->D") // get's lost, no subscribers yet
}
.onEach {
when (it) {
"collector->D" -> expect(2)
"collector->C" -> expect(3)
"collector->A" -> expect(4)
"collector->B" -> expect(5)
"share->A" -> expect(6)
"share->B" -> {
expect(7)
currentCoroutineContext().cancel()
}
else -> expectUnreached()
}
}
.launchIn(this)
.join()
finish(8)
}
@Test
@Suppress("DEPRECATION") // 'catch'
fun onSubscriptionThrows() = runTest {
expect(1)
val sh = MutableSharedFlow<String>(1)
sh.tryEmit("OK") // buffer a string
assertEquals(listOf("OK"), sh.replayCache)
sh
.onSubscription {
expect(2)
throw TestException()
}
.catch { e ->
assertTrue(e is TestException)
expect(3)
}
.collect {
// onSubscription throw before replay is emitted, so no value is collected if it throws
expectUnreached()
}
assertEquals(0, sh.subscriptionCount.value)
finish(4)
}
@Test
fun testBigReplayManySubscribers() = testManySubscribers(true)
@Test
fun testBigBufferManySubscribers() = testManySubscribers(false)
private fun testManySubscribers(replay: Boolean) = runTest {
val n = 100
val rnd = Random(replay.hashCode())
val sh = MutableSharedFlow<Int>(
replay = if (replay) n else 0,
extraBufferCapacity = if (replay) 0 else n
)
val subs = ArrayList<SubJob>()
for (i in 1..n) {
sh.emit(i)
val subBarrier = Channel<Unit>()
val subJob = SubJob()
subs += subJob
// will receive all starting from replay or from new emissions only
subJob.lastReceived = if (replay) 0 else i
subJob.job = sh
.onSubscription {
subBarrier.send(Unit) // signal subscribed
}
.onEach { value ->
assertEquals(subJob.lastReceived + 1, value)
subJob.lastReceived = value
}
.launchIn(this)
subBarrier.receive() // wait until subscribed
// must have also receive all from the replay buffer directly after being subscribed
assertEquals(subJob.lastReceived, i)
// 50% of time cancel one subscriber
if (i % 2 == 0) {
val victim = subs.removeAt(rnd.nextInt(subs.size))
yield() // make sure victim processed all emissions
assertEquals(victim.lastReceived, i)
victim.job.cancel()
}
}
yield() // make sure the last emission is processed
for (subJob in subs) {
assertEquals(subJob.lastReceived, n)
subJob.job.cancel()
}
}
private class SubJob {
lateinit var job: Job
var lastReceived = 0
}
@Test
fun testStateFlowModel() = runTest {
if (isBoundByJsTestTimeout) return@runTest // Too slow for JS, bounded by 2 sec. default JS timeout
val stateFlow = MutableStateFlow<Data?>(null)
val expect = modelLog(stateFlow)
val sharedFlow = MutableSharedFlow<Data?>(
replay = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
sharedFlow.tryEmit(null) // initial value
val actual = modelLog(sharedFlow) { distinctUntilChanged() }
for (i in 0 until minOf(expect.size, actual.size)) {
if (actual[i] != expect[i]) {
for (j in maxOf(0, i - 10)..i) println("Actual log item #$j: ${actual[j]}")
assertEquals(expect[i], actual[i], "Log item #$i")
}
}
assertEquals(expect.size, actual.size)
}
private suspend fun modelLog(
sh: MutableSharedFlow<Data?>,
op: Flow<Data?>.() -> Flow<Data?> = { this }
): List<String> = coroutineScope {
val rnd = Random(1)
val result = ArrayList<String>()
val job = launch {
sh.op().collect { value ->
result.add("Collect: $value")
repeat(rnd.nextInt(0..2)) {
result.add("Collect: yield")
yield()
}
}
}
repeat(1000) {
val value = if (rnd.nextBoolean()) null else rnd.nextData()
if (rnd.nextInt(20) == 0) {
result.add("resetReplayCache & emit: $value")
if (sh !is StateFlow<*>) sh.resetReplayCache()
assertTrue(sh.tryEmit(value))
} else {
result.add("Emit: $value")
sh.emit(value)
}
repeat(rnd.nextInt(0..2)) {
result.add("Emit: yield")
yield()
}
}
result.add("main: cancel")
job.cancel()
result.add("main: yield")
yield()
result.add("main: join")
job.join()
result
}
data class Data(val x: Int)
private val dataCache = (1..5).associateWith { Data(it) }
// Note that we test proper null support here, too
private fun Random.nextData(): Data? {
val x = nextInt(0..5)
if (x == 0) return null
// randomly reuse ref or create a new instance
return if(nextBoolean()) dataCache[x] else Data(x)
}
@Test
fun testOperatorFusion() {
val sh = MutableSharedFlow<String>()
assertSame(sh, (sh as Flow<*>).cancellable())
assertSame(sh, (sh as Flow<*>).flowOn(Dispatchers.Default))
assertSame(sh, sh.buffer(Channel.RENDEZVOUS))
}
@Test
fun testIllegalArgumentException() {
assertFailsWith<IllegalArgumentException> { MutableSharedFlow<Int>(-1) }
assertFailsWith<IllegalArgumentException> { MutableSharedFlow<Int>(0, extraBufferCapacity = -1) }
assertFailsWith<IllegalArgumentException> { MutableSharedFlow<Int>(0, onBufferOverflow = BufferOverflow.DROP_LATEST) }
assertFailsWith<IllegalArgumentException> { MutableSharedFlow<Int>(0, onBufferOverflow = BufferOverflow.DROP_OLDEST) }
}
@Test
public fun testReplayCancellability() = testCancellability(fromReplay = true)
@Test
public fun testEmitCancellability() = testCancellability(fromReplay = false)
private fun testCancellability(fromReplay: Boolean) = runTest {
expect(1)
val sh = MutableSharedFlow<Int>(5)
fun emitTestData() {
for (i in 1..5) assertTrue(sh.tryEmit(i))
}
if (fromReplay) emitTestData() // fill in replay first
var subscribed = true
val job = sh
.onSubscription { subscribed = true }
.onEach { i ->
when (i) {
1 -> expect(2)
2 -> expect(3)
3 -> {
expect(4)
currentCoroutineContext().cancel()
}
else -> expectUnreached() // shall check for cancellation
}
}
.launchIn(this)
yield()
assertTrue(subscribed) // yielding in enough
if (!fromReplay) emitTestData() // emit after subscription
job.join()
finish(5)
}
}