Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
| 3 | */ |
| 4 | |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 5 | package kotlinx.coroutines.scheduling |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 6 | |
Vsevolod Tolstopyatov | 4236c8c | 2019-10-11 00:35:47 +0300 | [diff] [blame] | 7 | import kotlinx.atomicfu.* |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 8 | import kotlinx.coroutines.* |
| 9 | import kotlinx.coroutines.internal.* |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 10 | import org.junit.* |
| 11 | import org.junit.Test |
| 12 | import java.util.concurrent.* |
| 13 | import java.util.concurrent.atomic.* |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 14 | import kotlin.coroutines.* |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 15 | import kotlin.test.* |
| 16 | |
| 17 | class CoroutineSchedulerStressTest : TestBase() { |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 18 | private var dispatcher: ExperimentalCoroutineDispatcher = ExperimentalCoroutineDispatcher() |
| 19 | private val observedThreads = ConcurrentHashMap<Thread, Long>() |
Vsevolod Tolstopyatov | 6839151 | 2018-08-13 12:46:53 +0300 | [diff] [blame] | 20 | private val tasksNum = 500_000 * stressMemoryMultiplier() |
Vsevolod Tolstopyatov | fb6f240 | 2018-08-08 19:28:47 +0300 | [diff] [blame] | 21 | |
| 22 | private fun stressMemoryMultiplier(): Int { |
| 23 | return if (isStressTest) { |
Roman Elizarov | 21ce9c0 | 2018-08-10 20:38:41 +0300 | [diff] [blame] | 24 | AVAILABLE_PROCESSORS * 4 |
Vsevolod Tolstopyatov | fb6f240 | 2018-08-08 19:28:47 +0300 | [diff] [blame] | 25 | } else { |
| 26 | 1 |
| 27 | } |
| 28 | } |
| 29 | |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 30 | private val processed = AtomicInteger(0) |
| 31 | private val finishLatch = CountDownLatch(1) |
| 32 | |
| 33 | @After |
| 34 | fun tearDown() { |
| 35 | dispatcher.close() |
| 36 | } |
| 37 | |
| 38 | @Test |
Vsevolod Tolstopyatov | 4236c8c | 2019-10-11 00:35:47 +0300 | [diff] [blame] | 39 | fun testInternalTasksSubmissionProgress() { |
Vsevolod Tolstopyatov | ab30d72 | 2019-10-17 16:02:36 +0300 | [diff] [blame] | 40 | /* |
| 41 | * Run a lot of tasks and validate that |
Vsevolod Tolstopyatov | 4236c8c | 2019-10-11 00:35:47 +0300 | [diff] [blame] | 42 | * 1) All of them are completed successfully |
| 43 | * 2) Every thread executed task at least once |
| 44 | */ |
| 45 | dispatcher.dispatch(EmptyCoroutineContext, Runnable { |
| 46 | for (i in 1..tasksNum) { |
| 47 | dispatcher.dispatch(EmptyCoroutineContext, ValidatingRunnable()) |
| 48 | } |
| 49 | }) |
| 50 | |
| 51 | finishLatch.await() |
| 52 | val observed = observedThreads.size |
| 53 | // on slow machines not all threads can be observed |
| 54 | assertTrue(observed in (AVAILABLE_PROCESSORS - 1)..(AVAILABLE_PROCESSORS + 1), "Observed $observed threads with $AVAILABLE_PROCESSORS available processors") |
| 55 | validateResults() |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 56 | } |
| 57 | |
| 58 | @Test |
Vsevolod Tolstopyatov | 4236c8c | 2019-10-11 00:35:47 +0300 | [diff] [blame] | 59 | fun testStealingFromNonProgressing() { |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 60 | /* |
| 61 | * Work-stealing stress test, |
| 62 | * one thread submits pack of tasks, waits until they are completed (to avoid work offloading) |
| 63 | * and then repeats, thus never executing its own tasks and relying only on work stealing. |
| 64 | */ |
| 65 | var blockingThread: Thread? = null |
| 66 | dispatcher.dispatch(EmptyCoroutineContext, Runnable { |
| 67 | // Submit million tasks |
| 68 | blockingThread = Thread.currentThread() |
| 69 | var submittedTasks = 0 |
Vsevolod Tolstopyatov | fb6f240 | 2018-08-08 19:28:47 +0300 | [diff] [blame] | 70 | while (submittedTasks < tasksNum) { |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 71 | |
Vsevolod Tolstopyatov | fb6f240 | 2018-08-08 19:28:47 +0300 | [diff] [blame] | 72 | ++submittedTasks |
Vsevolod Tolstopyatov | 4236c8c | 2019-10-11 00:35:47 +0300 | [diff] [blame] | 73 | dispatcher.dispatch(EmptyCoroutineContext, ValidatingRunnable()) |
Vsevolod Tolstopyatov | fb6f240 | 2018-08-08 19:28:47 +0300 | [diff] [blame] | 74 | while (submittedTasks - processed.get() > 100) { |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 75 | Thread.yield() |
| 76 | } |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 77 | } |
Vsevolod Tolstopyatov | fb6f240 | 2018-08-08 19:28:47 +0300 | [diff] [blame] | 78 | // Block current thread |
| 79 | finishLatch.await() |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 80 | }) |
| 81 | |
| 82 | finishLatch.await() |
| 83 | |
Vsevolod Tolstopyatov | ab30d72 | 2019-10-17 16:02:36 +0300 | [diff] [blame] | 84 | assertFalse(observedThreads.containsKey(blockingThread!!)) |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 85 | validateResults() |
| 86 | } |
| 87 | |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 88 | private fun processTask() { |
| 89 | val counter = observedThreads[Thread.currentThread()] ?: 0L |
| 90 | observedThreads[Thread.currentThread()] = counter + 1 |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 91 | if (processed.incrementAndGet() == tasksNum) { |
| 92 | finishLatch.countDown() |
| 93 | } |
| 94 | } |
| 95 | |
| 96 | private fun validateResults() { |
| 97 | val result = observedThreads.values.sum() |
| 98 | assertEquals(tasksNum.toLong(), result) |
Vsevolod Tolstopyatov | 4236c8c | 2019-10-11 00:35:47 +0300 | [diff] [blame] | 99 | } |
| 100 | |
| 101 | private inner class ValidatingRunnable : Runnable { |
| 102 | private val invoked = atomic(false) |
| 103 | override fun run() { |
| 104 | if (!invoked.compareAndSet(false, true)) error("The same runnable was invoked twice") |
| 105 | processTask() |
| 106 | } |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 107 | } |
Vsevolod Tolstopyatov | 3ac73f6 | 2018-07-26 16:09:33 +0300 | [diff] [blame] | 108 | } |