Roman Elizarov | 34c3464 | 2020-10-13 14:02:52 +0300 | [diff] [blame^] | 1 | /* |
| 2 | * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
| 3 | */ |
| 4 | |
| 5 | package kotlinx.coroutines.flow |
| 6 | |
| 7 | import kotlinx.coroutines.* |
| 8 | import org.junit.* |
| 9 | import org.junit.Test |
| 10 | import java.util.* |
| 11 | import java.util.concurrent.atomic.* |
| 12 | import kotlin.random.* |
| 13 | import kotlin.test.* |
| 14 | import kotlin.time.* |
| 15 | import kotlin.time.TimeSource |
| 16 | |
| 17 | @OptIn(ExperimentalTime::class) |
| 18 | class SharingStressTest : TestBase() { |
| 19 | private val testDuration = 1000L * stressTestMultiplier |
| 20 | private val nSubscribers = 5 |
| 21 | private val testStarted = TimeSource.Monotonic.markNow() |
| 22 | |
| 23 | @get:Rule |
| 24 | val emitterDispatcher = ExecutorRule(1) |
| 25 | |
| 26 | @get:Rule |
| 27 | val subscriberDispatcher = ExecutorRule(nSubscribers) |
| 28 | |
| 29 | @Test |
| 30 | public fun testNoReplayLazy() = |
| 31 | testStress(0, started = SharingStarted.Lazily) |
| 32 | |
| 33 | @Test |
| 34 | public fun testNoReplayWhileSubscribed() = |
| 35 | testStress(0, started = SharingStarted.WhileSubscribed()) |
| 36 | |
| 37 | @Test |
| 38 | public fun testNoReplayWhileSubscribedTimeout() = |
| 39 | testStress(0, started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 50L)) |
| 40 | |
| 41 | @Test |
| 42 | public fun testReplay100WhileSubscribed() = |
| 43 | testStress(100, started = SharingStarted.WhileSubscribed()) |
| 44 | |
| 45 | @Test |
| 46 | public fun testReplay100WhileSubscribedReset() = |
| 47 | testStress(100, started = SharingStarted.WhileSubscribed(replayExpirationMillis = 0L)) |
| 48 | |
| 49 | @Test |
| 50 | public fun testReplay100WhileSubscribedTimeout() = |
| 51 | testStress(100, started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 50L)) |
| 52 | |
| 53 | @Test |
| 54 | public fun testStateLazy() = |
| 55 | testStress(1, started = SharingStarted.Lazily) |
| 56 | |
| 57 | @Test |
| 58 | public fun testStateWhileSubscribed() = |
| 59 | testStress(1, started = SharingStarted.WhileSubscribed()) |
| 60 | |
| 61 | @Test |
| 62 | public fun testStateWhileSubscribedReset() = |
| 63 | testStress(1, started = SharingStarted.WhileSubscribed(replayExpirationMillis = 0L)) |
| 64 | |
| 65 | private fun testStress(replay: Int, started: SharingStarted) = runTest { |
| 66 | log("-- Stress with replay=$replay, started=$started") |
| 67 | val random = Random(1) |
| 68 | val emitIndex = AtomicLong() |
| 69 | val cancelledEmits = HashSet<Long>() |
| 70 | val missingCollects = Collections.synchronizedSet(LinkedHashSet<Long>()) |
| 71 | // at most one copy of upstream can be running at any time |
| 72 | val isRunning = AtomicInteger(0) |
| 73 | val upstream = flow { |
| 74 | assertEquals(0, isRunning.getAndIncrement()) |
| 75 | try { |
| 76 | while (true) { |
| 77 | val value = emitIndex.getAndIncrement() |
| 78 | try { |
| 79 | emit(value) |
| 80 | } catch (e: CancellationException) { |
| 81 | // emission was cancelled -> could be missing |
| 82 | cancelledEmits.add(value) |
| 83 | throw e |
| 84 | } |
| 85 | } |
| 86 | } finally { |
| 87 | assertEquals(1, isRunning.getAndDecrement()) |
| 88 | } |
| 89 | } |
| 90 | val subCount = MutableStateFlow(0) |
| 91 | val sharingJob = Job() |
| 92 | val sharingScope = this + emitterDispatcher + sharingJob |
| 93 | val usingStateFlow = replay == 1 |
| 94 | val sharedFlow = if (usingStateFlow) |
| 95 | upstream.stateIn(sharingScope, started, 0L) |
| 96 | else |
| 97 | upstream.shareIn(sharingScope, started, replay) |
| 98 | try { |
| 99 | val subscribers = ArrayList<SubJob>() |
| 100 | withTimeoutOrNull(testDuration) { |
| 101 | // start and stop subscribers |
| 102 | while (true) { |
| 103 | log("Staring $nSubscribers subscribers") |
| 104 | repeat(nSubscribers) { |
| 105 | subscribers += launchSubscriber(sharedFlow, usingStateFlow, subCount, missingCollects) |
| 106 | } |
| 107 | // wait until they all subscribed |
| 108 | subCount.first { it == nSubscribers } |
| 109 | // let them work a bit more & make sure emitter did not hang |
| 110 | val fromEmitIndex = emitIndex.get() |
| 111 | val waitEmitIndex = fromEmitIndex + 100 // wait until 100 emitted |
| 112 | withTimeout(10000) { // wait for at most 10s for something to be emitted |
| 113 | do { |
| 114 | delay(random.nextLong(50L..100L)) |
| 115 | } while (emitIndex.get() < waitEmitIndex) // Ok, enough was emitted, wait more if not |
| 116 | } |
| 117 | // Stop all subscribers and ensure they collected something |
| 118 | log("Stopping subscribers (emitted = ${emitIndex.get() - fromEmitIndex})") |
| 119 | subscribers.forEach { |
| 120 | it.job.cancelAndJoin() |
| 121 | assertTrue { it.count > 0 } // something must be collected too |
| 122 | } |
| 123 | subscribers.clear() |
| 124 | log("Intermission") |
| 125 | delay(random.nextLong(10L..100L)) // wait a bit before starting them again |
| 126 | } |
| 127 | } |
| 128 | if (!subscribers.isEmpty()) { |
| 129 | log("Stopping subscribers") |
| 130 | subscribers.forEach { it.job.cancelAndJoin() } |
| 131 | } |
| 132 | } finally { |
| 133 | log("--- Finally: Cancelling sharing job") |
| 134 | sharingJob.cancel() |
| 135 | } |
| 136 | sharingJob.join() // make sure sharing job did not hang |
| 137 | log("Emitter was cancelled ${cancelledEmits.size} times") |
| 138 | log("Collectors missed ${missingCollects.size} values") |
| 139 | for (value in missingCollects) { |
| 140 | assertTrue(value in cancelledEmits, "Value $value is missing for no apparent reason") |
| 141 | } |
| 142 | } |
| 143 | |
| 144 | private fun CoroutineScope.launchSubscriber( |
| 145 | sharedFlow: SharedFlow<Long>, |
| 146 | usingStateFlow: Boolean, |
| 147 | subCount: MutableStateFlow<Int>, |
| 148 | missingCollects: MutableSet<Long> |
| 149 | ): SubJob { |
| 150 | val subJob = SubJob() |
| 151 | subJob.job = launch(subscriberDispatcher) { |
| 152 | var last = -1L |
| 153 | sharedFlow |
| 154 | .onSubscription { |
| 155 | subCount.increment(1) |
| 156 | } |
| 157 | .onCompletion { |
| 158 | subCount.increment(-1) |
| 159 | } |
| 160 | .collect { j -> |
| 161 | subJob.count++ |
| 162 | // last must grow sequentially, no jumping or losses |
| 163 | if (last == -1L) { |
| 164 | last = j |
| 165 | } else { |
| 166 | val expected = last + 1 |
| 167 | if (usingStateFlow) |
| 168 | assertTrue(expected <= j) |
| 169 | else { |
| 170 | if (expected != j) { |
| 171 | if (j == expected + 1) { |
| 172 | // if missing just one -- could be race with cancelled emit |
| 173 | missingCollects.add(expected) |
| 174 | } else { |
| 175 | // broken otherwise |
| 176 | assertEquals(expected, j) |
| 177 | } |
| 178 | } |
| 179 | } |
| 180 | last = j |
| 181 | } |
| 182 | } |
| 183 | } |
| 184 | return subJob |
| 185 | } |
| 186 | |
| 187 | private class SubJob { |
| 188 | lateinit var job: Job |
| 189 | var count = 0L |
| 190 | } |
| 191 | |
| 192 | private fun log(msg: String) = println("${testStarted.elapsedNow().toLongMilliseconds()} ms: $msg") |
| 193 | } |