blob: 794553b4826348880845712b9bfeeff71bbd7e2f [file] [log] [blame]
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.coroutines.*
import kotlin.test.*
/**
* This test suit for [SharedFlow] has a dense framework that allows to test complex
* suspend/resume scenarios while keeping the code readable. Each test here is for
* one specific [SharedFlow] configuration, testing all the various corner cases in its
* behavior.
*/
class SharedFlowScenarioTest : TestBase() {
@Test
fun testReplay1Extra2() =
testSharedFlow(MutableSharedFlow<Int>(1, 2)) {
// total buffer size == 3
expectReplayOf()
emitRightNow(1); expectReplayOf(1)
emitRightNow(2); expectReplayOf(2)
emitRightNow(3); expectReplayOf(3)
emitRightNow(4); expectReplayOf(4) // no prob - no subscribers
val a = subscribe("a"); collect(a, 4)
emitRightNow(5); expectReplayOf(5)
emitRightNow(6); expectReplayOf(6)
emitRightNow(7); expectReplayOf(7)
// suspend/collect sequentially
val e8 = emitSuspends(8); collect(a, 5); emitResumes(e8); expectReplayOf(8)
val e9 = emitSuspends(9); collect(a, 6); emitResumes(e9); expectReplayOf(9)
// buffer full, but parallel emitters can still suspend (queue up)
val e10 = emitSuspends(10)
val e11 = emitSuspends(11)
val e12 = emitSuspends(12)
collect(a, 7); emitResumes(e10); expectReplayOf(10) // buffer 8, 9 | 10
collect(a, 8); emitResumes(e11); expectReplayOf(11) // buffer 9, 10 | 11
sharedFlow.resetReplayCache(); expectReplayOf() // 9, 10 11 | no replay
collect(a, 9); emitResumes(e12); expectReplayOf(12)
collect(a, 10, 11, 12); expectReplayOf(12) // buffer empty | 12
emitRightNow(13); expectReplayOf(13)
emitRightNow(14); expectReplayOf(14)
emitRightNow(15); expectReplayOf(15) // buffer 13, 14 | 15
val e16 = emitSuspends(16)
val e17 = emitSuspends(17)
val e18 = emitSuspends(18)
cancel(e17); expectReplayOf(15) // cancel in the middle of three emits; buffer 13, 14 | 15
collect(a, 13); emitResumes(e16); expectReplayOf(16) // buffer 14, 15, | 16
collect(a, 14); emitResumes(e18); expectReplayOf(18) // buffer 15, 16 | 18
val e19 = emitSuspends(19)
val e20 = emitSuspends(20)
val e21 = emitSuspends(21)
cancel(e21); expectReplayOf(18) // cancel last emit; buffer 15, 16, 18
collect(a, 15); emitResumes(e19); expectReplayOf(19) // buffer 16, 18 | 19
collect(a, 16); emitResumes(e20); expectReplayOf(20) // buffer 18, 19 | 20
collect(a, 18, 19, 20); expectReplayOf(20) // buffer empty | 20
emitRightNow(22); expectReplayOf(22)
emitRightNow(23); expectReplayOf(23)
emitRightNow(24); expectReplayOf(24) // buffer 22, 23 | 24
val e25 = emitSuspends(25)
val e26 = emitSuspends(26)
val e27 = emitSuspends(27)
cancel(e25); expectReplayOf(24) // cancel first emit, buffer 22, 23 | 24
sharedFlow.resetReplayCache(); expectReplayOf() // buffer 22, 23, 24 | no replay
val b = subscribe("b") // new subscriber
collect(a, 22); emitResumes(e26); expectReplayOf(26) // buffer 23, 24 | 26
collect(b, 26)
collect(a, 23); emitResumes(e27); expectReplayOf(27) // buffer 24, 26 | 27
collect(a, 24, 26, 27) // buffer empty | 27
emitRightNow(28); expectReplayOf(28)
emitRightNow(29); expectReplayOf(29) // buffer 27, 28 | 29
collect(a, 28, 29) // but b is slow
val e30 = emitSuspends(30)
val e31 = emitSuspends(31)
val e32 = emitSuspends(32)
val e33 = emitSuspends(33)
val e34 = emitSuspends(34)
val e35 = emitSuspends(35)
val e36 = emitSuspends(36)
val e37 = emitSuspends(37)
val e38 = emitSuspends(38)
val e39 = emitSuspends(39)
cancel(e31) // cancel emitter in queue
cancel(b) // cancel slow subscriber -> 3 emitters resume
emitResumes(e30); emitResumes(e32); emitResumes(e33); expectReplayOf(33) // buffer 30, 32 | 33
val c = subscribe("c"); collect(c, 33) // replays
cancel(e34)
collect(a, 30); emitResumes(e35); expectReplayOf(35) // buffer 32, 33 | 35
cancel(e37)
cancel(a); emitResumes(e36); emitResumes(e38); expectReplayOf(38) // buffer 35, 36 | 38
collect(c, 35); emitResumes(e39); expectReplayOf(39) // buffer 36, 38 | 39
collect(c, 36, 38, 39); expectReplayOf(39)
cancel(c); expectReplayOf(39) // replay stays
}
@Test
fun testReplay1() =
testSharedFlow(MutableSharedFlow<Int>(1)) {
emitRightNow(0); expectReplayOf(0)
emitRightNow(1); expectReplayOf(1)
emitRightNow(2); expectReplayOf(2)
sharedFlow.resetReplayCache(); expectReplayOf()
sharedFlow.resetReplayCache(); expectReplayOf()
emitRightNow(3); expectReplayOf(3)
emitRightNow(4); expectReplayOf(4)
val a = subscribe("a"); collect(a, 4)
emitRightNow(5); expectReplayOf(5); collect(a, 5)
emitRightNow(6)
sharedFlow.resetReplayCache(); expectReplayOf()
sharedFlow.resetReplayCache(); expectReplayOf()
val e7 = emitSuspends(7)
val e8 = emitSuspends(8)
val e9 = emitSuspends(9)
collect(a, 6); emitResumes(e7); expectReplayOf(7)
sharedFlow.resetReplayCache(); expectReplayOf()
sharedFlow.resetReplayCache(); expectReplayOf() // buffer 7 | -- no replay, but still buffered
val b = subscribe("b")
collect(a, 7); emitResumes(e8); expectReplayOf(8)
collect(b, 8) // buffer | 8 -- a is slow
val e10 = emitSuspends(10)
val e11 = emitSuspends(11)
val e12 = emitSuspends(12)
cancel(e9)
collect(a, 8); emitResumes(e10); expectReplayOf(10)
collect(a, 10) // now b's slow
cancel(e11)
collect(b, 10); emitResumes(e12); expectReplayOf(12)
collect(a, 12)
collect(b, 12)
sharedFlow.resetReplayCache(); expectReplayOf()
sharedFlow.resetReplayCache(); expectReplayOf() // nothing is buffered -- both collectors up to date
emitRightNow(13); expectReplayOf(13)
collect(b, 13) // a is slow
val e14 = emitSuspends(14)
val e15 = emitSuspends(15)
val e16 = emitSuspends(16)
cancel(e14)
cancel(a); emitResumes(e15); expectReplayOf(15) // cancelling slow subscriber
collect(b, 15); emitResumes(e16); expectReplayOf(16)
collect(b, 16)
}
@Test
fun testReplay2Extra2DropOldest() =
testSharedFlow<Int>(MutableSharedFlow(2, 2, BufferOverflow.DROP_OLDEST)) {
emitRightNow(0); expectReplayOf(0)
emitRightNow(1); expectReplayOf(0, 1)
emitRightNow(2); expectReplayOf(1, 2)
emitRightNow(3); expectReplayOf(2, 3)
emitRightNow(4); expectReplayOf(3, 4)
val a = subscribe("a")
collect(a, 3)
emitRightNow(5); expectReplayOf(4, 5)
emitRightNow(6); expectReplayOf(5, 6)
emitRightNow(7); expectReplayOf(6, 7) // buffer 4, 5 | 6, 7
emitRightNow(8); expectReplayOf(7, 8) // buffer 5, 6 | 7, 8
emitRightNow(9); expectReplayOf(8, 9) // buffer 6, 7 | 8, 9
collect(a, 6, 7)
val b = subscribe("b")
collect(b, 8, 9) // buffer | 8, 9
emitRightNow(10); expectReplayOf(9, 10) // buffer 8 | 9, 10
collect(a, 8, 9, 10) // buffer | 9, 10, note "b" had not collected 10 yet
emitRightNow(11); expectReplayOf(10, 11) // buffer | 10, 11
emitRightNow(12); expectReplayOf(11, 12) // buffer 10 | 11, 12
emitRightNow(13); expectReplayOf(12, 13) // buffer 10, 11 | 12, 13
emitRightNow(14); expectReplayOf(13, 14) // buffer 11, 12 | 13, 14, "b" missed 10
collect(b, 11, 12, 13, 14)
sharedFlow.resetReplayCache(); expectReplayOf() // buffer 11, 12, 13, 14 |
sharedFlow.resetReplayCache(); expectReplayOf()
collect(a, 11, 12, 13, 14)
emitRightNow(15); expectReplayOf(15)
collect(a, 15)
collect(b, 15)
}
@Test // https://github.com/Kotlin/kotlinx.coroutines/issues/2320
fun testResumeFastSubscriberOnResumedEmitter() =
testSharedFlow<Int>(MutableSharedFlow(1)) {
// create two subscribers and start collecting
val s1 = subscribe("s1"); resumeCollecting(s1)
val s2 = subscribe("s2"); resumeCollecting(s2)
// now emit 0, make sure it is collected
emitRightNow(0); expectReplayOf(0)
awaitCollected(s1, 0)
awaitCollected(s2, 0)
// now emit 1, and only first subscriber continues and collects it
emitRightNow(1); expectReplayOf(1)
collect(s1, 1)
// now emit 2, it suspend (s2 is blocking it)
val e2 = emitSuspends(2)
resumeCollecting(s1) // resume, but does not collect (e2 is still queued)
collect(s2, 1) // resume + collect next --> resumes emitter, thus resumes s1
awaitCollected(s1, 2) // <-- S1 collects value from the newly resumed emitter here !!!
emitResumes(e2); expectReplayOf(2)
// now emit 3, it suspends (s2 blocks it)
val e3 = emitSuspends(3)
collect(s2, 2)
emitResumes(e3); expectReplayOf(3)
}
@Test
fun testSuspendedConcurrentEmitAndCancelSubscriberReplay1() =
testSharedFlow<Int>(MutableSharedFlow(1)) {
val a = subscribe("a");
emitRightNow(0); expectReplayOf(0)
collect(a, 0)
emitRightNow(1); expectReplayOf(1)
val e2 = emitSuspends(2) // suspends until 1 is collected
val e3 = emitSuspends(3) // suspends until 1 is collected, too
cancel(a) // must resume emitters 2 & 3
emitResumes(e2)
emitResumes(e3)
expectReplayOf(3) // but replay size is 1 so only 3 should be kept
// Note: originally, SharedFlow was in a broken state here with 3 elements in the buffer
val b = subscribe("b")
collect(b, 3)
emitRightNow(4); expectReplayOf(4)
collect(b, 4)
}
@Test
fun testSuspendedConcurrentEmitAndCancelSubscriberReplay1ExtraBuffer1() =
testSharedFlow<Int>(MutableSharedFlow( replay = 1, extraBufferCapacity = 1)) {
val a = subscribe("a");
emitRightNow(0); expectReplayOf(0)
collect(a, 0)
emitRightNow(1); expectReplayOf(1)
emitRightNow(2); expectReplayOf(2)
val e3 = emitSuspends(3) // suspends until 1 is collected
val e4 = emitSuspends(4) // suspends until 1 is collected, too
val e5 = emitSuspends(5) // suspends until 1 is collected, too
cancel(a) // must resume emitters 3, 4, 5
emitResumes(e3)
emitResumes(e4)
emitResumes(e5)
expectReplayOf(5)
val b = subscribe("b")
collect(b, 5)
emitRightNow(6); expectReplayOf(6)
collect(b, 6)
}
private fun <T> testSharedFlow(
sharedFlow: MutableSharedFlow<T>,
scenario: suspend ScenarioDsl<T>.() -> Unit
) = runTest {
var dsl: ScenarioDsl<T>? = null
try {
coroutineScope {
dsl = ScenarioDsl<T>(sharedFlow, coroutineContext)
dsl!!.scenario()
dsl!!.stop()
}
} catch (e: Throwable) {
dsl?.printLog()
throw e
}
}
private data class TestJob(val job: Job, val name: String) {
override fun toString(): String = name
}
private open class Action
private data class EmitResumes(val job: TestJob) : Action()
private data class Collected(val job: TestJob, val value: Any?) : Action()
private data class ResumeCollecting(val job: TestJob) : Action()
private data class Cancelled(val job: TestJob) : Action()
@OptIn(ExperimentalStdlibApi::class)
private class ScenarioDsl<T>(
val sharedFlow: MutableSharedFlow<T>,
coroutineContext: CoroutineContext
) {
private val log = ArrayList<String>()
private val timeout = 10000L
private val scope = CoroutineScope(coroutineContext + Job())
private val actions = HashSet<Action>()
private val actionWaiters = ArrayDeque<Continuation<Unit>>()
private var expectedReplay = emptyList<T>()
private fun checkReplay() {
assertEquals(expectedReplay, sharedFlow.replayCache)
}
private fun wakeupWaiters() {
repeat(actionWaiters.size) {
actionWaiters.removeFirst().resume(Unit)
}
}
private fun addAction(action: Action) {
actions.add(action)
wakeupWaiters()
}
private suspend fun awaitAction(action: Action) {
withTimeoutOrNull(timeout) {
while (!actions.remove(action)) {
suspendCancellableCoroutine<Unit> { actionWaiters.add(it) }
}
} ?: error("Timed out waiting for action: $action")
wakeupWaiters()
}
private fun launchEmit(a: T): TestJob {
val name = "emit($a)"
val job = scope.launch(start = CoroutineStart.UNDISPATCHED) {
val job = TestJob(coroutineContext[Job]!!, name)
try {
log(name)
sharedFlow.emit(a)
log("$name resumes")
addAction(EmitResumes(job))
} catch(e: CancellationException) {
log("$name cancelled")
addAction(Cancelled(job))
}
}
return TestJob(job, name)
}
fun expectReplayOf(vararg a: T) {
expectedReplay = a.toList()
checkReplay()
}
fun emitRightNow(a: T) {
val job = launchEmit(a)
assertTrue(actions.remove(EmitResumes(job)))
}
fun emitSuspends(a: T): TestJob {
val job = launchEmit(a)
assertFalse(EmitResumes(job) in actions)
checkReplay()
return job
}
suspend fun emitResumes(job: TestJob) {
awaitAction(EmitResumes(job))
}
suspend fun cancel(job: TestJob) {
log("cancel(${job.name})")
job.job.cancel()
awaitAction(Cancelled(job))
}
fun subscribe(id: String): TestJob {
val name = "collect($id)"
val job = scope.launch(start = CoroutineStart.UNDISPATCHED) {
val job = TestJob(coroutineContext[Job]!!, name)
try {
awaitAction(ResumeCollecting(job))
log("$name start")
sharedFlow.collect { value ->
log("$name -> $value")
addAction(Collected(job, value))
awaitAction(ResumeCollecting(job))
log("$name -> $value resumes")
}
error("$name completed")
} catch(e: CancellationException) {
log("$name cancelled")
addAction(Cancelled(job))
}
}
return TestJob(job, name)
}
// collect ~== resumeCollecting + awaitCollected (for each value)
suspend fun collect(job: TestJob, vararg a: T) {
for (value in a) {
checkReplay() // should not have changed
resumeCollecting(job)
awaitCollected(job, value)
}
}
suspend fun resumeCollecting(job: TestJob) {
addAction(ResumeCollecting(job))
}
suspend fun awaitCollected(job: TestJob, value: T) {
awaitAction(Collected(job, value))
}
fun stop() {
log("--- stop")
scope.cancel()
}
private fun log(text: String) {
log.add(text)
}
fun printLog() {
println("--- The most recent log entries ---")
log.takeLast(30).forEach(::println)
println("--- That's it ---")
}
}
}