blob: ef4ce14b7f49f25923cdaf5f75c0a740a3bce669 [file] [log] [blame]
package kotlinx.coroutines.experimental.scheduling
import kotlinx.atomicfu.*
import kotlinx.coroutines.experimental.*
import org.junit.*
import kotlin.coroutines.experimental.*
abstract class SchedulerTestBase : TestBase() {
companion object {
val CORES_COUNT = Runtime.getRuntime().availableProcessors()
/**
* Asserts that [expectedThreadsCount] pool worker threads were created.
* Note that 'created' doesn't mean 'exists' because pool supports dynamic shrinking
*/
fun checkPoolThreadsCreated(expectedThreadsCount: Int = CORES_COUNT) {
val threadsCount = maxSequenceNumber()!! + 1
require(threadsCount == expectedThreadsCount)
{ "Expected $expectedThreadsCount pool threads, but has $threadsCount" }
}
/**
* Asserts that any number of pool worker threads in [range] were created.
* Note that 'created' doesn't mean 'exists' because pool supports dynamic shrinking
*/
fun checkPoolThreadsCreated(range: IntRange) {
val maxSequenceNumber = maxSequenceNumber()!! + 1
require(maxSequenceNumber in range) { "Expected pool threads to be in interval $range, but has $maxSequenceNumber" }
}
/**
* Asserts that any number of pool worker threads in [range] exists at the time of method invocation
*/
fun checkPoolThreadsExist(range: IntRange) {
val threads = Thread.getAllStackTraces().keys.filter { it is CoroutineScheduler.PoolWorker }.count()
require(threads in range) { "Expected threads in $range interval, but has $threads" }
}
/**
* Asserts that [expectedThreadsCount] of pool worker threads exists at the time of method invocation
*/
fun checkPoolThreadsExist(expectedThreadsCount: Int = CORES_COUNT) {
val threads = Thread.getAllStackTraces().keys.filter { it is CoroutineScheduler.PoolWorker }.count()
require(threads == expectedThreadsCount) { "Expected $expectedThreadsCount threads, but has $threads" }
}
fun initialPoolSize() = Runtime.getRuntime().availableProcessors().coerceAtMost(2)
private fun maxSequenceNumber(): Int? {
return Thread.getAllStackTraces().keys.filter { it is CoroutineScheduler.PoolWorker }
.map { sequenceNumber(it.name) }.max()
}
private fun sequenceNumber(threadName: String): Int {
val suffix = threadName.substring(threadName.lastIndexOf("-") + 1)
val separatorIndex = suffix.indexOf(' ')
if (separatorIndex == -1) {
return suffix.toInt()
}
return suffix.substring(0, separatorIndex).toInt()
}
suspend fun Iterable<Job>.joinAll() = forEach { it.join() }
}
private var exception = atomic<Throwable?>(null)
private val handler = CoroutineExceptionHandler({ _, e -> exception.value = e })
protected var corePoolSize = 1
protected var maxPoolSize = 1024
private var _dispatcher: ExperimentalCoroutineDispatcher? = null
protected val dispatcher: CoroutineContext
get() {
if (_dispatcher == null) {
_dispatcher = ExperimentalCoroutineDispatcher(corePoolSize, maxPoolSize)
}
return _dispatcher!! + handler
}
protected var blockingDispatcher = lazy {
blockingDispatcher(1000)
}
protected fun blockingDispatcher(parallelism: Int): CoroutineContext {
val intitialize = dispatcher
return _dispatcher!!.blocking(parallelism) + handler
}
@After
fun after() {
runBlocking {
withTimeout(5_000) {
_dispatcher?.close()
}
}
exception.value?.let { throw it }
}
}