Vsevolod Tolstopyatov | b23729e | 2018-04-09 16:30:31 +0300 | [diff] [blame] | 1 | package kotlinx.coroutines.experimental.scheduling |
| 2 | |
| 3 | import kotlinx.coroutines.experimental.* |
| 4 | import org.junit.* |
| 5 | import java.util.concurrent.* |
| 6 | |
| 7 | class BlockingCoroutineDispatcherTest : SchedulerTestBase() { |
| 8 | |
| 9 | @Test(timeout = 1_000) |
| 10 | fun testNonBlockingWithBlockingExternal() = runBlocking { |
| 11 | val barrier = CyclicBarrier(2) |
| 12 | |
| 13 | val blockingJob = launch(blockingDispatcher.value) { |
| 14 | barrier.await() |
| 15 | } |
| 16 | |
| 17 | val nonBlockingJob = launch(dispatcher) { |
| 18 | barrier.await() |
| 19 | } |
| 20 | |
| 21 | nonBlockingJob.join() |
| 22 | blockingJob.join() |
| 23 | checkPoolThreadsCreated(2) |
| 24 | } |
| 25 | |
| 26 | @Test(timeout = 10_000) |
| 27 | fun testNonBlockingFromBlocking() = runBlocking { |
| 28 | val barrier = CyclicBarrier(2) |
| 29 | |
| 30 | val blocking = launch(blockingDispatcher.value) { |
| 31 | // This task will be stolen |
| 32 | launch(dispatcher) { |
| 33 | barrier.await() |
| 34 | } |
| 35 | |
| 36 | barrier.await() |
| 37 | } |
| 38 | |
| 39 | blocking.join() |
| 40 | checkPoolThreadsCreated(2) |
| 41 | } |
| 42 | |
| 43 | @Test(timeout = 1_000) |
| 44 | fun testScheduleBlockingThreadCount() = runTest { |
| 45 | // After first iteration pool is idle, repeat, no new threads should be created |
| 46 | repeat(2) { |
| 47 | val blocking = launch(blockingDispatcher.value) { |
| 48 | launch(blockingDispatcher.value) { |
| 49 | } |
| 50 | } |
| 51 | |
| 52 | blocking.join() |
| 53 | // Depends on how fast thread will be created |
| 54 | checkPoolThreadsCreated(2..3) |
| 55 | } |
| 56 | } |
| 57 | |
| 58 | @Test(timeout = 1_000) |
Vsevolod Tolstopyatov | b23729e | 2018-04-09 16:30:31 +0300 | [diff] [blame] | 59 | fun testNoCpuStarvation() = runBlocking { |
| 60 | val tasksNum = 100 |
| 61 | val barrier = CyclicBarrier(tasksNum + 1) |
| 62 | val tasks = (1..tasksNum).map { launch(blockingDispatcher.value) { barrier.await() } } |
| 63 | |
| 64 | val cpuTask = launch(dispatcher) { |
| 65 | // Do nothing, just complete |
| 66 | } |
| 67 | |
| 68 | cpuTask.join() |
| 69 | tasks.forEach { require(it.isActive) } |
| 70 | barrier.await() |
| 71 | tasks.joinAll() |
| 72 | checkPoolThreadsCreated(101) |
| 73 | } |
| 74 | |
| 75 | @Test(timeout = 1_000) |
| 76 | fun testNoCpuStarvationWithMultipleBlockingContexts() = runBlocking { |
| 77 | val firstBarrier = CyclicBarrier(11) |
| 78 | val secondBarrier = CyclicBarrier(11) |
| 79 | val blockingDispatcher = blockingDispatcher(10) |
| 80 | val blockingDispatcher2 = blockingDispatcher(10) |
| 81 | |
| 82 | val blockingTasks = (1..10).flatMap { |
| 83 | listOf(launch(blockingDispatcher) { firstBarrier.await() }, launch(blockingDispatcher2) { secondBarrier.await() }) |
| 84 | } |
| 85 | |
| 86 | val cpuTasks = (1..100).map { |
| 87 | launch(dispatcher) { |
| 88 | // Do nothing, just complete |
| 89 | } |
| 90 | }.toList() |
| 91 | |
| 92 | cpuTasks.joinAll() |
| 93 | blockingTasks.forEach { require(it.isActive) } |
| 94 | firstBarrier.await() |
| 95 | secondBarrier.await() |
| 96 | blockingTasks.joinAll() |
| 97 | checkPoolThreadsCreated(21..22) |
| 98 | } |
| 99 | |
| 100 | @Test(timeout = 1_000) |
| 101 | fun testNoExcessThreadsCreated() = runBlocking { |
| 102 | corePoolSize = 4 |
| 103 | |
| 104 | val tasksNum = 100 |
| 105 | val barrier = CyclicBarrier(tasksNum + 1) |
| 106 | val blockingTasks = (1..tasksNum).map { launch(blockingDispatcher.value) { barrier.await() } } |
| 107 | |
| 108 | val nonBlockingTasks = (1..tasksNum).map { |
| 109 | launch(dispatcher) { |
| 110 | yield() |
| 111 | } |
| 112 | } |
| 113 | |
| 114 | nonBlockingTasks.joinAll() |
| 115 | barrier.await() |
| 116 | blockingTasks.joinAll() |
| 117 | // There may be race when multiple CPU threads are trying to lazily created one more |
Vsevolod Tolstopyatov | fb6f240 | 2018-08-08 19:28:47 +0300 | [diff] [blame] | 118 | checkPoolThreadsCreated(104..120) |
Vsevolod Tolstopyatov | b23729e | 2018-04-09 16:30:31 +0300 | [diff] [blame] | 119 | } |
| 120 | |
| 121 | @Test |
| 122 | fun testBlockingFairness() = runBlocking { |
| 123 | corePoolSize = 1 |
| 124 | maxPoolSize = 1 |
| 125 | |
| 126 | val blocking = blockingDispatcher(1) |
| 127 | val task = async(dispatcher) { |
| 128 | expect(1) |
| 129 | |
| 130 | val nonBlocking = async(dispatcher) { |
| 131 | expect(3) |
| 132 | } |
| 133 | |
| 134 | val firstBlocking = async(blocking) { |
| 135 | expect(2) |
| 136 | } |
| 137 | |
| 138 | val secondBlocking = async(blocking) { |
| 139 | // Already have 1 queued blocking task, so this one wouldn't be scheduled to head |
| 140 | expect(4) |
| 141 | } |
| 142 | |
| 143 | listOf(firstBlocking, nonBlocking, secondBlocking).joinAll() |
| 144 | finish(5) |
| 145 | } |
| 146 | |
| 147 | task.await() |
| 148 | } |
| 149 | |
| 150 | @Test |
| 151 | fun testBoundedBlockingFairness() = runBlocking { |
| 152 | corePoolSize = 1 |
| 153 | maxPoolSize = 1 |
| 154 | |
| 155 | val blocking = blockingDispatcher(2) |
| 156 | val task = async(dispatcher) { |
| 157 | expect(1) |
| 158 | |
| 159 | val nonBlocking = async(dispatcher) { |
| 160 | expect(3) |
| 161 | } |
| 162 | |
| 163 | val firstBlocking = async(blocking) { |
| 164 | expect(4) |
| 165 | } |
| 166 | |
| 167 | val secondNonBlocking = async(dispatcher) { |
| 168 | expect(5) |
| 169 | } |
| 170 | |
| 171 | val secondBlocking = async(blocking) { |
| 172 | expect(2) // <- last submitted blocking is executed first |
| 173 | } |
| 174 | |
| 175 | val thirdBlocking = async(blocking) { |
| 176 | expect(6) // parallelism level is reached before this task |
| 177 | } |
| 178 | |
| 179 | listOf(firstBlocking, nonBlocking, secondBlocking, secondNonBlocking, thirdBlocking).joinAll() |
| 180 | finish(7) |
| 181 | } |
| 182 | |
| 183 | task.await() |
| 184 | } |
| 185 | |
| 186 | @Test(timeout = 1_000) |
| 187 | fun testYield() = runBlocking { |
| 188 | corePoolSize = 1 |
| 189 | maxPoolSize = 1 |
| 190 | val ds = blockingDispatcher(1) |
| 191 | val outerJob = launch(ds) { |
| 192 | expect(1) |
| 193 | val innerJob = launch(ds) { |
| 194 | // Do nothing |
| 195 | expect(3) |
| 196 | } |
| 197 | |
| 198 | expect(2) |
| 199 | while (innerJob.isActive) { |
| 200 | yield() |
| 201 | } |
| 202 | |
| 203 | expect(4) |
| 204 | innerJob.join() |
| 205 | } |
| 206 | |
| 207 | outerJob.join() |
| 208 | finish(5) |
| 209 | } |
| 210 | |
| 211 | @Test(expected = IllegalArgumentException::class) |
| 212 | fun testNegativeParallelism() { |
| 213 | blockingDispatcher(-1) |
| 214 | } |
| 215 | |
| 216 | @Test(expected = IllegalArgumentException::class) |
| 217 | fun testZeroParallelism() { |
| 218 | blockingDispatcher(0) |
| 219 | } |
| 220 | } |