blob: 2cf8d8ac16a60648049a0307e64f553292e85d5b [file] [log] [blame]
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +03001/*
2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5package kotlinx.coroutines.experimental.scheduling
6
7import kotlinx.coroutines.experimental.*
Roman Elizarov21ce9c02018-08-10 20:38:41 +03008import kotlinx.coroutines.experimental.internal.*
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +03009import kotlinx.coroutines.experimental.scheduling.SchedulerTestBase.Companion.checkPoolThreadsCreated
10import org.junit.*
11import org.junit.Test
12import java.util.concurrent.*
13import java.util.concurrent.atomic.*
14import kotlin.coroutines.experimental.*
15import kotlin.test.*
16
17class CoroutineSchedulerStressTest : TestBase() {
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030018 private var dispatcher: ExperimentalCoroutineDispatcher = ExperimentalCoroutineDispatcher()
19 private val observedThreads = ConcurrentHashMap<Thread, Long>()
Vsevolod Tolstopyatov68391512018-08-13 12:46:53 +030020 private val tasksNum = 500_000 * stressMemoryMultiplier()
Vsevolod Tolstopyatovfb6f2402018-08-08 19:28:47 +030021
22 private fun stressMemoryMultiplier(): Int {
23 return if (isStressTest) {
Roman Elizarov21ce9c02018-08-10 20:38:41 +030024 AVAILABLE_PROCESSORS * 4
Vsevolod Tolstopyatovfb6f2402018-08-08 19:28:47 +030025 } else {
26 1
27 }
28 }
29
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030030 private val processed = AtomicInteger(0)
31 private val finishLatch = CountDownLatch(1)
32
33 @After
34 fun tearDown() {
35 dispatcher.close()
36 }
37
38 @Test
39 fun testExternalTasksSubmission() {
40 stressTest(CommonPool)
41 }
42
43 @Test
44 fun testInternalTasksSubmission() {
45 stressTest(dispatcher)
46 }
47
48 @Test
49 fun testStealingFromBlocking() {
50 /*
51 * Work-stealing stress test,
52 * one thread submits pack of tasks, waits until they are completed (to avoid work offloading)
53 * and then repeats, thus never executing its own tasks and relying only on work stealing.
54 */
55 var blockingThread: Thread? = null
56 dispatcher.dispatch(EmptyCoroutineContext, Runnable {
57 // Submit million tasks
58 blockingThread = Thread.currentThread()
59 var submittedTasks = 0
Vsevolod Tolstopyatovfb6f2402018-08-08 19:28:47 +030060 while (submittedTasks < tasksNum) {
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030061
Vsevolod Tolstopyatovfb6f2402018-08-08 19:28:47 +030062 ++submittedTasks
63 dispatcher.dispatch(EmptyCoroutineContext, Runnable {
64 processTask()
65 })
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030066
Vsevolod Tolstopyatovfb6f2402018-08-08 19:28:47 +030067 while (submittedTasks - processed.get() > 100) {
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030068 Thread.yield()
69 }
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030070 }
Vsevolod Tolstopyatovfb6f2402018-08-08 19:28:47 +030071
72 // Block current thread
73 finishLatch.await()
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030074 })
75
76 finishLatch.await()
77
78 require(blockingThread!! !in observedThreads)
79 validateResults()
80 }
81
82 private fun stressTest(submissionInitiator: CoroutineDispatcher) {
83 /*
Roman Elizarov21ce9c02018-08-10 20:38:41 +030084 * Run 2 million tasks and validate that
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +030085 * 1) All of them are completed successfully
86 * 2) Every thread executed task at least once
87 */
88 submissionInitiator.dispatch(EmptyCoroutineContext, Runnable {
89 for (i in 1..tasksNum) {
90 dispatcher.dispatch(EmptyCoroutineContext, Runnable {
91 processTask()
92 })
93 }
94 })
95
96 finishLatch.await()
Roman Elizarov21ce9c02018-08-10 20:38:41 +030097 val n = observedThreads.size
98 println("Observed $n threads with $AVAILABLE_PROCESSORS available processors")
99 assertTrue(AVAILABLE_PROCESSORS in (n - 1)..n)
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +0300100 validateResults()
101 }
102
103 private fun processTask() {
104 val counter = observedThreads[Thread.currentThread()] ?: 0L
105 observedThreads[Thread.currentThread()] = counter + 1
106
107 if (processed.incrementAndGet() == tasksNum) {
108 finishLatch.countDown()
109 }
110 }
111
112 private fun validateResults() {
113 val result = observedThreads.values.sum()
114 assertEquals(tasksNum.toLong(), result)
Roman Elizarov21ce9c02018-08-10 20:38:41 +0300115 checkPoolThreadsCreated(AVAILABLE_PROCESSORS)
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +0300116 }
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +0300117}