blob: 55c90bd546305c3afe274966d664cb21d3be1cb1 [file] [log] [blame]
package kotlinx.coroutines.experimental.scheduling
import kotlinx.coroutines.experimental.*
import org.junit.*
import org.junit.Test
import java.util.concurrent.atomic.*
import kotlin.coroutines.experimental.*
import kotlin.test.*
class CoroutineDispatcherTest : SchedulerTestBase() {
@After
fun tearDown() {
schedulerTimeSource = NanoTimeSource
}
@Test
fun testSingleThread() = runBlocking {
expect(1)
withContext(dispatcher) {
require(Thread.currentThread() is CoroutineScheduler.PoolWorker)
expect(2)
val job = async(coroutineContext) {
expect(3)
delay(10)
expect(4)
}
job.await()
expect(5)
}
finish(6)
checkPoolThreadsCreated(1)
}
@Test
fun testFairScheduling() = runBlocking {
corePoolSize = 1
expect(1)
val outerJob = launch(dispatcher) {
val d1 = launch(dispatcher) { expect(3) }
val d2 = launch(dispatcher) { expect(4) }
val d3 = launch(dispatcher) { expect(2) }
listOf(d1, d2, d3).joinAll()
}
outerJob.join()
finish(5)
}
@Test
fun testStealing() = runBlocking {
corePoolSize = 2
val flag = AtomicBoolean(false)
val job = async(context = dispatcher) {
expect(1)
val innerJob = async {
expect(2)
flag.set(true)
}
while (!flag.get()) {
Thread.yield() // Block current thread, submitted inner job will be stolen
}
innerJob.await()
expect(3)
}
job.await()
finish(4)
checkPoolThreadsCreated(2)
}
@Test
fun testNoStealing() = runBlocking {
corePoolSize = CORES_COUNT
schedulerTimeSource = TestTimeSource(0L)
withContext(dispatcher) {
val thread = Thread.currentThread()
val job = async(dispatcher) {
assertEquals(thread, Thread.currentThread())
val innerJob = async(dispatcher) {
assertEquals(thread, Thread.currentThread())
}
innerJob.await()
}
job.await()
assertEquals(thread, Thread.currentThread())
}
checkPoolThreadsCreated(initialPoolSize()..initialPoolSize() + 1)
}
@Test
fun testDelay() = runBlocking {
corePoolSize = 2
withContext(dispatcher) {
expect(1)
delay(10)
expect(2)
}
finish(3)
checkPoolThreadsCreated(2)
}
@Test
fun testWithTimeout() = runBlocking {
corePoolSize = CORES_COUNT
withContext(dispatcher) {
expect(1)
val result = withTimeoutOrNull(1000) {
expect(2)
yield() // yield only now
"OK"
}
assertEquals("OK", result)
val nullResult = withTimeoutOrNull(1000) {
expect(3)
while (true) {
yield()
}
}
assertNull(nullResult)
finish(4)
}
checkPoolThreadsCreated(initialPoolSize()..CORES_COUNT)
}
@Test
fun testMaxSize() = runBlocking {
corePoolSize = 1
maxPoolSize = 4
(1..10).map { launch(blockingDispatcher.value) { Thread.sleep(100) } }.joinAll()
checkPoolThreadsCreated(4)
}
@Test(timeout = 1_000)
fun testYield() = runBlocking {
corePoolSize = 1
maxPoolSize = 1
val outerJob = launch(dispatcher) {
expect(1)
val innerJob = launch(dispatcher) {
// Do nothing
expect(3)
}
expect(2)
while (innerJob.isActive) {
yield()
}
expect(4)
innerJob.join()
}
outerJob.join()
finish(5)
}
}