Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
| 3 | */ |
| 4 | |
| 5 | package kotlinx.coroutines.experimental.scheduling |
| 6 | |
| 7 | import kotlinx.coroutines.experimental.* |
Roman Elizarov | 21ce9c0 | 2018-08-10 20:38:41 +0300 | [diff] [blame] | 8 | import kotlinx.coroutines.experimental.internal.* |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 9 | import kotlinx.coroutines.experimental.scheduling.SchedulerTestBase.Companion.checkPoolThreadsCreated |
| 10 | import org.junit.* |
| 11 | import org.junit.Test |
| 12 | import java.util.concurrent.* |
| 13 | import java.util.concurrent.atomic.* |
| 14 | import kotlin.coroutines.experimental.* |
| 15 | import kotlin.test.* |
| 16 | |
| 17 | class CoroutineSchedulerStressTest : TestBase() { |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 18 | private var dispatcher: ExperimentalCoroutineDispatcher = ExperimentalCoroutineDispatcher() |
| 19 | private val observedThreads = ConcurrentHashMap<Thread, Long>() |
Vsevolod Tolstopyatov | 6839151 | 2018-08-13 12:46:53 +0300 | [diff] [blame^] | 20 | private val tasksNum = 500_000 * stressMemoryMultiplier() |
Vsevolod Tolstopyatov | fb6f240 | 2018-08-08 19:28:47 +0300 | [diff] [blame] | 21 | |
| 22 | private fun stressMemoryMultiplier(): Int { |
| 23 | return if (isStressTest) { |
Roman Elizarov | 21ce9c0 | 2018-08-10 20:38:41 +0300 | [diff] [blame] | 24 | AVAILABLE_PROCESSORS * 4 |
Vsevolod Tolstopyatov | fb6f240 | 2018-08-08 19:28:47 +0300 | [diff] [blame] | 25 | } else { |
| 26 | 1 |
| 27 | } |
| 28 | } |
| 29 | |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 30 | private val processed = AtomicInteger(0) |
| 31 | private val finishLatch = CountDownLatch(1) |
| 32 | |
| 33 | @After |
| 34 | fun tearDown() { |
| 35 | dispatcher.close() |
| 36 | } |
| 37 | |
| 38 | @Test |
| 39 | fun testExternalTasksSubmission() { |
| 40 | stressTest(CommonPool) |
| 41 | } |
| 42 | |
| 43 | @Test |
| 44 | fun testInternalTasksSubmission() { |
| 45 | stressTest(dispatcher) |
| 46 | } |
| 47 | |
| 48 | @Test |
| 49 | fun testStealingFromBlocking() { |
| 50 | /* |
| 51 | * Work-stealing stress test, |
| 52 | * one thread submits pack of tasks, waits until they are completed (to avoid work offloading) |
| 53 | * and then repeats, thus never executing its own tasks and relying only on work stealing. |
| 54 | */ |
| 55 | var blockingThread: Thread? = null |
| 56 | dispatcher.dispatch(EmptyCoroutineContext, Runnable { |
| 57 | // Submit million tasks |
| 58 | blockingThread = Thread.currentThread() |
| 59 | var submittedTasks = 0 |
Vsevolod Tolstopyatov | fb6f240 | 2018-08-08 19:28:47 +0300 | [diff] [blame] | 60 | while (submittedTasks < tasksNum) { |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 61 | |
Vsevolod Tolstopyatov | fb6f240 | 2018-08-08 19:28:47 +0300 | [diff] [blame] | 62 | ++submittedTasks |
| 63 | dispatcher.dispatch(EmptyCoroutineContext, Runnable { |
| 64 | processTask() |
| 65 | }) |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 66 | |
Vsevolod Tolstopyatov | fb6f240 | 2018-08-08 19:28:47 +0300 | [diff] [blame] | 67 | while (submittedTasks - processed.get() > 100) { |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 68 | Thread.yield() |
| 69 | } |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 70 | } |
Vsevolod Tolstopyatov | fb6f240 | 2018-08-08 19:28:47 +0300 | [diff] [blame] | 71 | |
| 72 | // Block current thread |
| 73 | finishLatch.await() |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 74 | }) |
| 75 | |
| 76 | finishLatch.await() |
| 77 | |
| 78 | require(blockingThread!! !in observedThreads) |
| 79 | validateResults() |
| 80 | } |
| 81 | |
| 82 | private fun stressTest(submissionInitiator: CoroutineDispatcher) { |
| 83 | /* |
Roman Elizarov | 21ce9c0 | 2018-08-10 20:38:41 +0300 | [diff] [blame] | 84 | * Run 2 million tasks and validate that |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 85 | * 1) All of them are completed successfully |
| 86 | * 2) Every thread executed task at least once |
| 87 | */ |
| 88 | submissionInitiator.dispatch(EmptyCoroutineContext, Runnable { |
| 89 | for (i in 1..tasksNum) { |
| 90 | dispatcher.dispatch(EmptyCoroutineContext, Runnable { |
| 91 | processTask() |
| 92 | }) |
| 93 | } |
| 94 | }) |
| 95 | |
| 96 | finishLatch.await() |
Roman Elizarov | 21ce9c0 | 2018-08-10 20:38:41 +0300 | [diff] [blame] | 97 | val n = observedThreads.size |
| 98 | println("Observed $n threads with $AVAILABLE_PROCESSORS available processors") |
| 99 | assertTrue(AVAILABLE_PROCESSORS in (n - 1)..n) |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 100 | validateResults() |
| 101 | } |
| 102 | |
| 103 | private fun processTask() { |
| 104 | val counter = observedThreads[Thread.currentThread()] ?: 0L |
| 105 | observedThreads[Thread.currentThread()] = counter + 1 |
| 106 | |
| 107 | if (processed.incrementAndGet() == tasksNum) { |
| 108 | finishLatch.countDown() |
| 109 | } |
| 110 | } |
| 111 | |
| 112 | private fun validateResults() { |
| 113 | val result = observedThreads.values.sum() |
| 114 | assertEquals(tasksNum.toLong(), result) |
Roman Elizarov | 21ce9c0 | 2018-08-10 20:38:41 +0300 | [diff] [blame] | 115 | checkPoolThreadsCreated(AVAILABLE_PROCESSORS) |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 116 | } |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 117 | } |