blob: 06957ff733fabcc7344e31fb99166b8acdd43d11 [file] [log] [blame]
package kotlinx.coroutines.experimental.scheduling
import kotlinx.coroutines.experimental.*
import org.junit.*
import java.util.concurrent.atomic.*
class BlockingCoroutineDispatcherRaceStressTest : SchedulerTestBase() {
private val concurrentWorkers = AtomicInteger(0)
@Before
fun setUp() {
// In case of starvation test will hang
idleWorkerKeepAliveNs = Long.MAX_VALUE
}
@Test
fun testAddPollRace() = runBlocking {
val limitingDispatcher = blockingDispatcher(1)
val iterations = 25_000 * stressTestMultiplier
// Stress test for specific case (race #2 from LimitingDispatcher). Shouldn't hang.
for (i in 1..iterations) {
val tasks = (1..2).map {
async(limitingDispatcher) {
try {
val currentlyExecuting = concurrentWorkers.incrementAndGet()
require(currentlyExecuting == 1)
} finally {
concurrentWorkers.decrementAndGet()
}
}
}
tasks.forEach { it.await() }
}
checkPoolThreadsCreated(2..4)
}
@Test
fun testPingPongThreadsCount() = runBlocking {
corePoolSize = CORES_COUNT
val iterations = 100_000 * stressTestMultiplier
// Stress test for specific case (race #2 from LimitingDispatcher). Shouldn't hang.
for (i in 1..iterations) {
val tasks = (1..2).map {
async(dispatcher) {
// Useless work
concurrentWorkers.incrementAndGet()
concurrentWorkers.decrementAndGet()
}
}
tasks.forEach { it.await() }
}
checkPoolThreadsCreated(CORES_COUNT)
}
}