blob: 5453ad8d6d2e28a98d4f5698a97e4cf48b0d9eb9 [file] [log] [blame]
Vsevolod Tolstopyatovb23729e2018-04-09 16:30:31 +03001package kotlinx.coroutines.experimental.scheduling
2
3import kotlinx.coroutines.experimental.*
4import org.junit.*
5import java.util.concurrent.*
6
7class BlockingCoroutineDispatcherTest : SchedulerTestBase() {
8
9 @Test(timeout = 1_000)
10 fun testNonBlockingWithBlockingExternal() = runBlocking {
11 val barrier = CyclicBarrier(2)
12
13 val blockingJob = launch(blockingDispatcher.value) {
14 barrier.await()
15 }
16
17 val nonBlockingJob = launch(dispatcher) {
18 barrier.await()
19 }
20
21 nonBlockingJob.join()
22 blockingJob.join()
23 checkPoolThreadsCreated(2)
24 }
25
26 @Test(timeout = 10_000)
27 fun testNonBlockingFromBlocking() = runBlocking {
28 val barrier = CyclicBarrier(2)
29
30 val blocking = launch(blockingDispatcher.value) {
31 // This task will be stolen
32 launch(dispatcher) {
33 barrier.await()
34 }
35
36 barrier.await()
37 }
38
39 blocking.join()
40 checkPoolThreadsCreated(2)
41 }
42
43 @Test(timeout = 1_000)
44 fun testScheduleBlockingThreadCount() = runTest {
45 // After first iteration pool is idle, repeat, no new threads should be created
46 repeat(2) {
47 val blocking = launch(blockingDispatcher.value) {
48 launch(blockingDispatcher.value) {
49 }
50 }
51
52 blocking.join()
53 // Depends on how fast thread will be created
54 checkPoolThreadsCreated(2..3)
55 }
56 }
57
58 @Test(timeout = 1_000)
Vsevolod Tolstopyatovb23729e2018-04-09 16:30:31 +030059 fun testNoCpuStarvation() = runBlocking {
60 val tasksNum = 100
61 val barrier = CyclicBarrier(tasksNum + 1)
62 val tasks = (1..tasksNum).map { launch(blockingDispatcher.value) { barrier.await() } }
63
64 val cpuTask = launch(dispatcher) {
65 // Do nothing, just complete
66 }
67
68 cpuTask.join()
69 tasks.forEach { require(it.isActive) }
70 barrier.await()
71 tasks.joinAll()
72 checkPoolThreadsCreated(101)
73 }
74
75 @Test(timeout = 1_000)
76 fun testNoCpuStarvationWithMultipleBlockingContexts() = runBlocking {
77 val firstBarrier = CyclicBarrier(11)
78 val secondBarrier = CyclicBarrier(11)
79 val blockingDispatcher = blockingDispatcher(10)
80 val blockingDispatcher2 = blockingDispatcher(10)
81
82 val blockingTasks = (1..10).flatMap {
83 listOf(launch(blockingDispatcher) { firstBarrier.await() }, launch(blockingDispatcher2) { secondBarrier.await() })
84 }
85
86 val cpuTasks = (1..100).map {
87 launch(dispatcher) {
88 // Do nothing, just complete
89 }
90 }.toList()
91
92 cpuTasks.joinAll()
93 blockingTasks.forEach { require(it.isActive) }
94 firstBarrier.await()
95 secondBarrier.await()
96 blockingTasks.joinAll()
97 checkPoolThreadsCreated(21..22)
98 }
99
100 @Test(timeout = 1_000)
101 fun testNoExcessThreadsCreated() = runBlocking {
102 corePoolSize = 4
103
104 val tasksNum = 100
105 val barrier = CyclicBarrier(tasksNum + 1)
106 val blockingTasks = (1..tasksNum).map { launch(blockingDispatcher.value) { barrier.await() } }
107
108 val nonBlockingTasks = (1..tasksNum).map {
109 launch(dispatcher) {
110 yield()
111 }
112 }
113
114 nonBlockingTasks.joinAll()
115 barrier.await()
116 blockingTasks.joinAll()
117 // There may be race when multiple CPU threads are trying to lazily created one more
Vsevolod Tolstopyatovfb6f2402018-08-08 19:28:47 +0300118 checkPoolThreadsCreated(104..120)
Vsevolod Tolstopyatovb23729e2018-04-09 16:30:31 +0300119 }
120
121 @Test
122 fun testBlockingFairness() = runBlocking {
123 corePoolSize = 1
124 maxPoolSize = 1
125
126 val blocking = blockingDispatcher(1)
127 val task = async(dispatcher) {
128 expect(1)
129
130 val nonBlocking = async(dispatcher) {
131 expect(3)
132 }
133
134 val firstBlocking = async(blocking) {
135 expect(2)
136 }
137
138 val secondBlocking = async(blocking) {
139 // Already have 1 queued blocking task, so this one wouldn't be scheduled to head
140 expect(4)
141 }
142
143 listOf(firstBlocking, nonBlocking, secondBlocking).joinAll()
144 finish(5)
145 }
146
147 task.await()
148 }
149
150 @Test
151 fun testBoundedBlockingFairness() = runBlocking {
152 corePoolSize = 1
153 maxPoolSize = 1
154
155 val blocking = blockingDispatcher(2)
156 val task = async(dispatcher) {
157 expect(1)
158
159 val nonBlocking = async(dispatcher) {
160 expect(3)
161 }
162
163 val firstBlocking = async(blocking) {
164 expect(4)
165 }
166
167 val secondNonBlocking = async(dispatcher) {
168 expect(5)
169 }
170
171 val secondBlocking = async(blocking) {
172 expect(2) // <- last submitted blocking is executed first
173 }
174
175 val thirdBlocking = async(blocking) {
176 expect(6) // parallelism level is reached before this task
177 }
178
179 listOf(firstBlocking, nonBlocking, secondBlocking, secondNonBlocking, thirdBlocking).joinAll()
180 finish(7)
181 }
182
183 task.await()
184 }
185
186 @Test(timeout = 1_000)
187 fun testYield() = runBlocking {
188 corePoolSize = 1
189 maxPoolSize = 1
190 val ds = blockingDispatcher(1)
191 val outerJob = launch(ds) {
192 expect(1)
193 val innerJob = launch(ds) {
194 // Do nothing
195 expect(3)
196 }
197
198 expect(2)
199 while (innerJob.isActive) {
200 yield()
201 }
202
203 expect(4)
204 innerJob.join()
205 }
206
207 outerJob.join()
208 finish(5)
209 }
210
211 @Test(expected = IllegalArgumentException::class)
212 fun testNegativeParallelism() {
213 blockingDispatcher(-1)
214 }
215
216 @Test(expected = IllegalArgumentException::class)
217 fun testZeroParallelism() {
218 blockingDispatcher(0)
219 }
220}