blob: cb49f054ce2964ae0954499b350c71622b62c2f3 [file] [log] [blame]
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +03001/*
2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
Roman Elizarov0950dfa2018-07-13 10:33:25 +03005package kotlinx.coroutines.scheduling
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +03006
Vsevolod Tolstopyatov4236c8c2019-10-11 00:35:47 +03007import kotlinx.atomicfu.*
Roman Elizarov0950dfa2018-07-13 10:33:25 +03008import kotlinx.coroutines.*
9import kotlinx.coroutines.internal.*
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030010import org.junit.*
11import org.junit.Test
12import java.util.concurrent.*
13import java.util.concurrent.atomic.*
Roman Elizarov0950dfa2018-07-13 10:33:25 +030014import kotlin.coroutines.*
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030015import kotlin.test.*
16
17class CoroutineSchedulerStressTest : TestBase() {
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030018 private var dispatcher: ExperimentalCoroutineDispatcher = ExperimentalCoroutineDispatcher()
19 private val observedThreads = ConcurrentHashMap<Thread, Long>()
Vsevolod Tolstopyatov68391512018-08-13 12:46:53 +030020 private val tasksNum = 500_000 * stressMemoryMultiplier()
Vsevolod Tolstopyatovfb6f2402018-08-08 19:28:47 +030021
22 private fun stressMemoryMultiplier(): Int {
23 return if (isStressTest) {
Roman Elizarov21ce9c02018-08-10 20:38:41 +030024 AVAILABLE_PROCESSORS * 4
Vsevolod Tolstopyatovfb6f2402018-08-08 19:28:47 +030025 } else {
26 1
27 }
28 }
29
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030030 private val processed = AtomicInteger(0)
31 private val finishLatch = CountDownLatch(1)
32
33 @After
34 fun tearDown() {
35 dispatcher.close()
36 }
37
38 @Test
Vsevolod Tolstopyatov4236c8c2019-10-11 00:35:47 +030039 fun testInternalTasksSubmissionProgress() {
Vsevolod Tolstopyatovab30d722019-10-17 16:02:36 +030040 /*
41 * Run a lot of tasks and validate that
Vsevolod Tolstopyatov4236c8c2019-10-11 00:35:47 +030042 * 1) All of them are completed successfully
43 * 2) Every thread executed task at least once
44 */
45 dispatcher.dispatch(EmptyCoroutineContext, Runnable {
46 for (i in 1..tasksNum) {
47 dispatcher.dispatch(EmptyCoroutineContext, ValidatingRunnable())
48 }
49 })
50
51 finishLatch.await()
52 val observed = observedThreads.size
53 // on slow machines not all threads can be observed
54 assertTrue(observed in (AVAILABLE_PROCESSORS - 1)..(AVAILABLE_PROCESSORS + 1), "Observed $observed threads with $AVAILABLE_PROCESSORS available processors")
55 validateResults()
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030056 }
57
58 @Test
Vsevolod Tolstopyatov4236c8c2019-10-11 00:35:47 +030059 fun testStealingFromNonProgressing() {
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030060 /*
61 * Work-stealing stress test,
62 * one thread submits pack of tasks, waits until they are completed (to avoid work offloading)
63 * and then repeats, thus never executing its own tasks and relying only on work stealing.
64 */
65 var blockingThread: Thread? = null
66 dispatcher.dispatch(EmptyCoroutineContext, Runnable {
67 // Submit million tasks
68 blockingThread = Thread.currentThread()
69 var submittedTasks = 0
Vsevolod Tolstopyatovfb6f2402018-08-08 19:28:47 +030070 while (submittedTasks < tasksNum) {
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030071
Vsevolod Tolstopyatovfb6f2402018-08-08 19:28:47 +030072 ++submittedTasks
Vsevolod Tolstopyatov4236c8c2019-10-11 00:35:47 +030073 dispatcher.dispatch(EmptyCoroutineContext, ValidatingRunnable())
Vsevolod Tolstopyatovfb6f2402018-08-08 19:28:47 +030074 while (submittedTasks - processed.get() > 100) {
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030075 Thread.yield()
76 }
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030077 }
Vsevolod Tolstopyatovfb6f2402018-08-08 19:28:47 +030078 // Block current thread
79 finishLatch.await()
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030080 })
81
82 finishLatch.await()
83
Vsevolod Tolstopyatovab30d722019-10-17 16:02:36 +030084 assertFalse(observedThreads.containsKey(blockingThread!!))
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030085 validateResults()
86 }
87
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030088 private fun processTask() {
89 val counter = observedThreads[Thread.currentThread()] ?: 0L
90 observedThreads[Thread.currentThread()] = counter + 1
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030091 if (processed.incrementAndGet() == tasksNum) {
92 finishLatch.countDown()
93 }
94 }
95
96 private fun validateResults() {
97 val result = observedThreads.values.sum()
98 assertEquals(tasksNum.toLong(), result)
Vsevolod Tolstopyatov4236c8c2019-10-11 00:35:47 +030099 }
100
101 private inner class ValidatingRunnable : Runnable {
102 private val invoked = atomic(false)
103 override fun run() {
104 if (!invoked.compareAndSet(false, true)) error("The same runnable was invoked twice")
105 processTask()
106 }
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +0300107 }
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +0300108}