blob: cb49f054ce2964ae0954499b350c71622b62c2f3 [file] [log] [blame]
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.scheduling
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
import org.junit.*
import org.junit.Test
import java.util.concurrent.*
import java.util.concurrent.atomic.*
import kotlin.coroutines.*
import kotlin.test.*
class CoroutineSchedulerStressTest : TestBase() {
private var dispatcher: ExperimentalCoroutineDispatcher = ExperimentalCoroutineDispatcher()
private val observedThreads = ConcurrentHashMap<Thread, Long>()
private val tasksNum = 500_000 * stressMemoryMultiplier()
private fun stressMemoryMultiplier(): Int {
return if (isStressTest) {
AVAILABLE_PROCESSORS * 4
} else {
1
}
}
private val processed = AtomicInteger(0)
private val finishLatch = CountDownLatch(1)
@After
fun tearDown() {
dispatcher.close()
}
@Test
fun testInternalTasksSubmissionProgress() {
/*
* Run a lot of tasks and validate that
* 1) All of them are completed successfully
* 2) Every thread executed task at least once
*/
dispatcher.dispatch(EmptyCoroutineContext, Runnable {
for (i in 1..tasksNum) {
dispatcher.dispatch(EmptyCoroutineContext, ValidatingRunnable())
}
})
finishLatch.await()
val observed = observedThreads.size
// on slow machines not all threads can be observed
assertTrue(observed in (AVAILABLE_PROCESSORS - 1)..(AVAILABLE_PROCESSORS + 1), "Observed $observed threads with $AVAILABLE_PROCESSORS available processors")
validateResults()
}
@Test
fun testStealingFromNonProgressing() {
/*
* Work-stealing stress test,
* one thread submits pack of tasks, waits until they are completed (to avoid work offloading)
* and then repeats, thus never executing its own tasks and relying only on work stealing.
*/
var blockingThread: Thread? = null
dispatcher.dispatch(EmptyCoroutineContext, Runnable {
// Submit million tasks
blockingThread = Thread.currentThread()
var submittedTasks = 0
while (submittedTasks < tasksNum) {
++submittedTasks
dispatcher.dispatch(EmptyCoroutineContext, ValidatingRunnable())
while (submittedTasks - processed.get() > 100) {
Thread.yield()
}
}
// Block current thread
finishLatch.await()
})
finishLatch.await()
assertFalse(observedThreads.containsKey(blockingThread!!))
validateResults()
}
private fun processTask() {
val counter = observedThreads[Thread.currentThread()] ?: 0L
observedThreads[Thread.currentThread()] = counter + 1
if (processed.incrementAndGet() == tasksNum) {
finishLatch.countDown()
}
}
private fun validateResults() {
val result = observedThreads.values.sum()
assertEquals(tasksNum.toLong(), result)
}
private inner class ValidatingRunnable : Runnable {
private val invoked = atomic(false)
override fun run() {
if (!invoked.compareAndSet(false, true)) error("The same runnable was invoked twice")
processTask()
}
}
}