blob: 7d346bdc338ac542d5ad7f4180f1a2192584b590 [file] [log] [blame]
Roman Elizarov34c34642020-10-13 14:02:52 +03001/*
2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5package kotlinx.coroutines.flow
6
7import kotlinx.coroutines.*
8import org.junit.*
9import org.junit.Test
10import java.util.*
11import java.util.concurrent.atomic.*
12import kotlin.random.*
13import kotlin.test.*
14import kotlin.time.*
15import kotlin.time.TimeSource
16
17@OptIn(ExperimentalTime::class)
18class SharingStressTest : TestBase() {
19 private val testDuration = 1000L * stressTestMultiplier
20 private val nSubscribers = 5
21 private val testStarted = TimeSource.Monotonic.markNow()
22
23 @get:Rule
24 val emitterDispatcher = ExecutorRule(1)
25
26 @get:Rule
27 val subscriberDispatcher = ExecutorRule(nSubscribers)
28
29 @Test
30 public fun testNoReplayLazy() =
31 testStress(0, started = SharingStarted.Lazily)
32
33 @Test
34 public fun testNoReplayWhileSubscribed() =
35 testStress(0, started = SharingStarted.WhileSubscribed())
36
37 @Test
38 public fun testNoReplayWhileSubscribedTimeout() =
39 testStress(0, started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 50L))
40
41 @Test
42 public fun testReplay100WhileSubscribed() =
43 testStress(100, started = SharingStarted.WhileSubscribed())
44
45 @Test
46 public fun testReplay100WhileSubscribedReset() =
47 testStress(100, started = SharingStarted.WhileSubscribed(replayExpirationMillis = 0L))
48
49 @Test
50 public fun testReplay100WhileSubscribedTimeout() =
51 testStress(100, started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 50L))
52
53 @Test
54 public fun testStateLazy() =
55 testStress(1, started = SharingStarted.Lazily)
56
57 @Test
58 public fun testStateWhileSubscribed() =
59 testStress(1, started = SharingStarted.WhileSubscribed())
60
61 @Test
62 public fun testStateWhileSubscribedReset() =
63 testStress(1, started = SharingStarted.WhileSubscribed(replayExpirationMillis = 0L))
64
65 private fun testStress(replay: Int, started: SharingStarted) = runTest {
66 log("-- Stress with replay=$replay, started=$started")
67 val random = Random(1)
68 val emitIndex = AtomicLong()
69 val cancelledEmits = HashSet<Long>()
70 val missingCollects = Collections.synchronizedSet(LinkedHashSet<Long>())
71 // at most one copy of upstream can be running at any time
72 val isRunning = AtomicInteger(0)
73 val upstream = flow {
74 assertEquals(0, isRunning.getAndIncrement())
75 try {
76 while (true) {
77 val value = emitIndex.getAndIncrement()
78 try {
79 emit(value)
80 } catch (e: CancellationException) {
81 // emission was cancelled -> could be missing
82 cancelledEmits.add(value)
83 throw e
84 }
85 }
86 } finally {
87 assertEquals(1, isRunning.getAndDecrement())
88 }
89 }
90 val subCount = MutableStateFlow(0)
91 val sharingJob = Job()
92 val sharingScope = this + emitterDispatcher + sharingJob
93 val usingStateFlow = replay == 1
94 val sharedFlow = if (usingStateFlow)
95 upstream.stateIn(sharingScope, started, 0L)
96 else
97 upstream.shareIn(sharingScope, started, replay)
98 try {
99 val subscribers = ArrayList<SubJob>()
100 withTimeoutOrNull(testDuration) {
101 // start and stop subscribers
102 while (true) {
103 log("Staring $nSubscribers subscribers")
104 repeat(nSubscribers) {
105 subscribers += launchSubscriber(sharedFlow, usingStateFlow, subCount, missingCollects)
106 }
107 // wait until they all subscribed
108 subCount.first { it == nSubscribers }
109 // let them work a bit more & make sure emitter did not hang
110 val fromEmitIndex = emitIndex.get()
111 val waitEmitIndex = fromEmitIndex + 100 // wait until 100 emitted
112 withTimeout(10000) { // wait for at most 10s for something to be emitted
113 do {
114 delay(random.nextLong(50L..100L))
115 } while (emitIndex.get() < waitEmitIndex) // Ok, enough was emitted, wait more if not
116 }
117 // Stop all subscribers and ensure they collected something
118 log("Stopping subscribers (emitted = ${emitIndex.get() - fromEmitIndex})")
119 subscribers.forEach {
120 it.job.cancelAndJoin()
121 assertTrue { it.count > 0 } // something must be collected too
122 }
123 subscribers.clear()
124 log("Intermission")
125 delay(random.nextLong(10L..100L)) // wait a bit before starting them again
126 }
127 }
128 if (!subscribers.isEmpty()) {
129 log("Stopping subscribers")
130 subscribers.forEach { it.job.cancelAndJoin() }
131 }
132 } finally {
133 log("--- Finally: Cancelling sharing job")
134 sharingJob.cancel()
135 }
136 sharingJob.join() // make sure sharing job did not hang
137 log("Emitter was cancelled ${cancelledEmits.size} times")
138 log("Collectors missed ${missingCollects.size} values")
139 for (value in missingCollects) {
140 assertTrue(value in cancelledEmits, "Value $value is missing for no apparent reason")
141 }
142 }
143
144 private fun CoroutineScope.launchSubscriber(
145 sharedFlow: SharedFlow<Long>,
146 usingStateFlow: Boolean,
147 subCount: MutableStateFlow<Int>,
148 missingCollects: MutableSet<Long>
149 ): SubJob {
150 val subJob = SubJob()
151 subJob.job = launch(subscriberDispatcher) {
152 var last = -1L
153 sharedFlow
154 .onSubscription {
155 subCount.increment(1)
156 }
157 .onCompletion {
158 subCount.increment(-1)
159 }
160 .collect { j ->
161 subJob.count++
162 // last must grow sequentially, no jumping or losses
163 if (last == -1L) {
164 last = j
165 } else {
166 val expected = last + 1
167 if (usingStateFlow)
168 assertTrue(expected <= j)
169 else {
170 if (expected != j) {
171 if (j == expected + 1) {
172 // if missing just one -- could be race with cancelled emit
173 missingCollects.add(expected)
174 } else {
175 // broken otherwise
176 assertEquals(expected, j)
177 }
178 }
179 }
180 last = j
181 }
182 }
183 }
184 return subJob
185 }
186
187 private class SubJob {
188 lateinit var job: Job
189 var count = 0L
190 }
191
192 private fun log(msg: String) = println("${testStarted.elapsedNow().toLongMilliseconds()} ms: $msg")
193}