Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
| 3 | */ |
| 4 | |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 5 | package kotlinx.coroutines.scheduling |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 6 | |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 7 | import kotlinx.coroutines.* |
| 8 | import kotlinx.coroutines.internal.* |
| 9 | import kotlinx.coroutines.scheduling.SchedulerTestBase.Companion.checkPoolThreadsCreated |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 10 | import org.junit.* |
Roman Elizarov | 7e8b52e | 2018-12-19 17:01:46 +0300 | [diff] [blame] | 11 | import org.junit.Ignore |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 12 | import org.junit.Test |
| 13 | import java.util.concurrent.* |
| 14 | import java.util.concurrent.atomic.* |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 15 | import kotlin.coroutines.* |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 16 | import kotlin.test.* |
| 17 | |
| 18 | class CoroutineSchedulerStressTest : TestBase() { |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 19 | private var dispatcher: ExperimentalCoroutineDispatcher = ExperimentalCoroutineDispatcher() |
| 20 | private val observedThreads = ConcurrentHashMap<Thread, Long>() |
Vsevolod Tolstopyatov | 6839151 | 2018-08-13 12:46:53 +0300 | [diff] [blame] | 21 | private val tasksNum = 500_000 * stressMemoryMultiplier() |
Vsevolod Tolstopyatov | fb6f240 | 2018-08-08 19:28:47 +0300 | [diff] [blame] | 22 | |
| 23 | private fun stressMemoryMultiplier(): Int { |
| 24 | return if (isStressTest) { |
Roman Elizarov | 21ce9c0 | 2018-08-10 20:38:41 +0300 | [diff] [blame] | 25 | AVAILABLE_PROCESSORS * 4 |
Vsevolod Tolstopyatov | fb6f240 | 2018-08-08 19:28:47 +0300 | [diff] [blame] | 26 | } else { |
| 27 | 1 |
| 28 | } |
| 29 | } |
| 30 | |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 31 | private val processed = AtomicInteger(0) |
| 32 | private val finishLatch = CountDownLatch(1) |
| 33 | |
| 34 | @After |
| 35 | fun tearDown() { |
| 36 | dispatcher.close() |
| 37 | } |
| 38 | |
| 39 | @Test |
Vsevolod Tolstopyatov | a2d8088 | 2018-09-24 19:51:49 +0300 | [diff] [blame] | 40 | @Suppress("DEPRECATION") |
Roman Elizarov | 7e8b52e | 2018-12-19 17:01:46 +0300 | [diff] [blame] | 41 | @Ignore // this test often fails on windows, todo: figure out how to fix it. See issue #904 |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 42 | fun testExternalTasksSubmission() { |
| 43 | stressTest(CommonPool) |
| 44 | } |
| 45 | |
| 46 | @Test |
| 47 | fun testInternalTasksSubmission() { |
| 48 | stressTest(dispatcher) |
| 49 | } |
| 50 | |
| 51 | @Test |
| 52 | fun testStealingFromBlocking() { |
| 53 | /* |
| 54 | * Work-stealing stress test, |
| 55 | * one thread submits pack of tasks, waits until they are completed (to avoid work offloading) |
| 56 | * and then repeats, thus never executing its own tasks and relying only on work stealing. |
| 57 | */ |
| 58 | var blockingThread: Thread? = null |
| 59 | dispatcher.dispatch(EmptyCoroutineContext, Runnable { |
| 60 | // Submit million tasks |
| 61 | blockingThread = Thread.currentThread() |
| 62 | var submittedTasks = 0 |
Vsevolod Tolstopyatov | fb6f240 | 2018-08-08 19:28:47 +0300 | [diff] [blame] | 63 | while (submittedTasks < tasksNum) { |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 64 | |
Vsevolod Tolstopyatov | fb6f240 | 2018-08-08 19:28:47 +0300 | [diff] [blame] | 65 | ++submittedTasks |
| 66 | dispatcher.dispatch(EmptyCoroutineContext, Runnable { |
| 67 | processTask() |
| 68 | }) |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 69 | |
Vsevolod Tolstopyatov | fb6f240 | 2018-08-08 19:28:47 +0300 | [diff] [blame] | 70 | while (submittedTasks - processed.get() > 100) { |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 71 | Thread.yield() |
| 72 | } |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 73 | } |
Vsevolod Tolstopyatov | fb6f240 | 2018-08-08 19:28:47 +0300 | [diff] [blame] | 74 | |
| 75 | // Block current thread |
| 76 | finishLatch.await() |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 77 | }) |
| 78 | |
| 79 | finishLatch.await() |
| 80 | |
Vsevolod Tolstopyatov | a672d5f | 2019-01-23 17:49:17 +0300 | [diff] [blame^] | 81 | require(!observedThreads.containsKey(blockingThread!!)) |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 82 | validateResults() |
| 83 | } |
| 84 | |
| 85 | private fun stressTest(submissionInitiator: CoroutineDispatcher) { |
| 86 | /* |
Roman Elizarov | 21ce9c0 | 2018-08-10 20:38:41 +0300 | [diff] [blame] | 87 | * Run 2 million tasks and validate that |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 88 | * 1) All of them are completed successfully |
| 89 | * 2) Every thread executed task at least once |
| 90 | */ |
| 91 | submissionInitiator.dispatch(EmptyCoroutineContext, Runnable { |
| 92 | for (i in 1..tasksNum) { |
| 93 | dispatcher.dispatch(EmptyCoroutineContext, Runnable { |
| 94 | processTask() |
| 95 | }) |
| 96 | } |
| 97 | }) |
| 98 | |
| 99 | finishLatch.await() |
Vsevolod Tolstopyatov | e0cf38f | 2018-08-15 14:24:57 +0300 | [diff] [blame] | 100 | val observed = observedThreads.size |
Roman Elizarov | 873e9a0 | 2018-09-27 21:24:13 +0300 | [diff] [blame] | 101 | // on slow machines not all threads can be observed |
| 102 | assertTrue(observed in (AVAILABLE_PROCESSORS - 1)..(AVAILABLE_PROCESSORS + 1), "Observed $observed threads with $AVAILABLE_PROCESSORS available processors") |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 103 | validateResults() |
| 104 | } |
| 105 | |
| 106 | private fun processTask() { |
| 107 | val counter = observedThreads[Thread.currentThread()] ?: 0L |
| 108 | observedThreads[Thread.currentThread()] = counter + 1 |
| 109 | |
| 110 | if (processed.incrementAndGet() == tasksNum) { |
| 111 | finishLatch.countDown() |
| 112 | } |
| 113 | } |
| 114 | |
| 115 | private fun validateResults() { |
| 116 | val result = observedThreads.values.sum() |
| 117 | assertEquals(tasksNum.toLong(), result) |
Roman Elizarov | 21ce9c0 | 2018-08-10 20:38:41 +0300 | [diff] [blame] | 118 | checkPoolThreadsCreated(AVAILABLE_PROCESSORS) |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 119 | } |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 120 | } |