blob: 683a889efac3354a52f46a1d6c45099f8b3998d6 [file] [log] [blame]
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +03001/*
2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
Roman Elizarov0950dfa2018-07-13 10:33:25 +03005package kotlinx.coroutines.scheduling
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +03006
Roman Elizarov0950dfa2018-07-13 10:33:25 +03007import kotlinx.coroutines.*
8import kotlinx.coroutines.internal.*
9import kotlinx.coroutines.scheduling.SchedulerTestBase.Companion.checkPoolThreadsCreated
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030010import org.junit.*
Roman Elizarov7e8b52e2018-12-19 17:01:46 +030011import org.junit.Ignore
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030012import org.junit.Test
13import java.util.concurrent.*
14import java.util.concurrent.atomic.*
Roman Elizarov0950dfa2018-07-13 10:33:25 +030015import kotlin.coroutines.*
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030016import kotlin.test.*
17
18class CoroutineSchedulerStressTest : TestBase() {
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030019 private var dispatcher: ExperimentalCoroutineDispatcher = ExperimentalCoroutineDispatcher()
20 private val observedThreads = ConcurrentHashMap<Thread, Long>()
Vsevolod Tolstopyatov68391512018-08-13 12:46:53 +030021 private val tasksNum = 500_000 * stressMemoryMultiplier()
Vsevolod Tolstopyatovfb6f2402018-08-08 19:28:47 +030022
23 private fun stressMemoryMultiplier(): Int {
24 return if (isStressTest) {
Roman Elizarov21ce9c02018-08-10 20:38:41 +030025 AVAILABLE_PROCESSORS * 4
Vsevolod Tolstopyatovfb6f2402018-08-08 19:28:47 +030026 } else {
27 1
28 }
29 }
30
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030031 private val processed = AtomicInteger(0)
32 private val finishLatch = CountDownLatch(1)
33
34 @After
35 fun tearDown() {
36 dispatcher.close()
37 }
38
39 @Test
Vsevolod Tolstopyatova2d80882018-09-24 19:51:49 +030040 @Suppress("DEPRECATION")
Roman Elizarov7e8b52e2018-12-19 17:01:46 +030041 @Ignore // this test often fails on windows, todo: figure out how to fix it. See issue #904
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030042 fun testExternalTasksSubmission() {
43 stressTest(CommonPool)
44 }
45
46 @Test
47 fun testInternalTasksSubmission() {
48 stressTest(dispatcher)
49 }
50
51 @Test
52 fun testStealingFromBlocking() {
53 /*
54 * Work-stealing stress test,
55 * one thread submits pack of tasks, waits until they are completed (to avoid work offloading)
56 * and then repeats, thus never executing its own tasks and relying only on work stealing.
57 */
58 var blockingThread: Thread? = null
59 dispatcher.dispatch(EmptyCoroutineContext, Runnable {
60 // Submit million tasks
61 blockingThread = Thread.currentThread()
62 var submittedTasks = 0
Vsevolod Tolstopyatovfb6f2402018-08-08 19:28:47 +030063 while (submittedTasks < tasksNum) {
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030064
Vsevolod Tolstopyatovfb6f2402018-08-08 19:28:47 +030065 ++submittedTasks
66 dispatcher.dispatch(EmptyCoroutineContext, Runnable {
67 processTask()
68 })
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030069
Vsevolod Tolstopyatovfb6f2402018-08-08 19:28:47 +030070 while (submittedTasks - processed.get() > 100) {
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030071 Thread.yield()
72 }
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030073 }
Vsevolod Tolstopyatovfb6f2402018-08-08 19:28:47 +030074
75 // Block current thread
76 finishLatch.await()
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030077 })
78
79 finishLatch.await()
80
Vsevolod Tolstopyatova672d5f2019-01-23 17:49:17 +030081 require(!observedThreads.containsKey(blockingThread!!))
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030082 validateResults()
83 }
84
85 private fun stressTest(submissionInitiator: CoroutineDispatcher) {
86 /*
Roman Elizarov21ce9c02018-08-10 20:38:41 +030087 * Run 2 million tasks and validate that
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030088 * 1) All of them are completed successfully
89 * 2) Every thread executed task at least once
90 */
91 submissionInitiator.dispatch(EmptyCoroutineContext, Runnable {
92 for (i in 1..tasksNum) {
93 dispatcher.dispatch(EmptyCoroutineContext, Runnable {
94 processTask()
95 })
96 }
97 })
98
99 finishLatch.await()
Vsevolod Tolstopyatove0cf38f2018-08-15 14:24:57 +0300100 val observed = observedThreads.size
Roman Elizarov873e9a02018-09-27 21:24:13 +0300101 // on slow machines not all threads can be observed
102 assertTrue(observed in (AVAILABLE_PROCESSORS - 1)..(AVAILABLE_PROCESSORS + 1), "Observed $observed threads with $AVAILABLE_PROCESSORS available processors")
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +0300103 validateResults()
104 }
105
106 private fun processTask() {
107 val counter = observedThreads[Thread.currentThread()] ?: 0L
108 observedThreads[Thread.currentThread()] = counter + 1
109
110 if (processed.incrementAndGet() == tasksNum) {
111 finishLatch.countDown()
112 }
113 }
114
115 private fun validateResults() {
116 val result = observedThreads.values.sum()
117 assertEquals(tasksNum.toLong(), result)
Roman Elizarov21ce9c02018-08-10 20:38:41 +0300118 checkPoolThreadsCreated(AVAILABLE_PROCESSORS)
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +0300119 }
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +0300120}