blob: d5be824e4e545796c9f680a1869629f6130ee163 [file] [log] [blame]
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +03001package kotlinx.coroutines.experimental
2
Vsevolod Tolstopyatov05d38232018-04-12 20:16:37 +03003import org.junit.*
4import org.junit.Test
5import java.util.concurrent.*
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +03006
7class AwaitStressTest : TestBase() {
8
9 private class TestException : Exception() {
10 override fun fillInStackTrace(): Throwable = this
11 }
12
Roman Elizarov203abb02018-04-12 12:34:14 +030013 private val iterations = 50_000 * stressTestMultiplier
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +030014 private val pool = newFixedThreadPoolContext(4, "AwaitStressTest")
15
16 @After
17 fun tearDown() {
18 pool.close()
19 }
20
21 @Test
22 fun testMultipleExceptions() = runTest {
23
24 repeat(iterations) {
25 val barrier = CyclicBarrier(4)
26
27 val d1 = async(pool) {
28 barrier.await()
29 throw TestException()
30 }
31
32 val d2 = async(pool) {
33 barrier.await()
34 throw TestException()
35 }
36
37 val d3 = async(pool) {
38 barrier.await()
39 1L
40 }
41
42 try {
43 barrier.await()
44 awaitAll(d1, d2, d3)
45 expectUnreached()
46 } catch (e: TestException) {
47 // Expected behaviour
48 }
49
50 barrier.reset()
51 }
52 }
53
54 @Test
55 fun testAwaitAll() = runTest {
56 val barrier = CyclicBarrier(3)
57
58 repeat(iterations) {
59 val d1 = async(pool) {
60 barrier.await()
61 1L
62 }
63
64 val d2 = async(pool) {
65 barrier.await()
66 2L
67 }
68
69 barrier.await()
70 awaitAll(d1, d2)
71 require(d1.isCompleted && d2.isCompleted)
72 barrier.reset()
73 }
74 }
75
76 @Test
77 fun testConcurrentCancellation() = runTest {
78 var cancelledOnce = false
79 repeat(iterations) {
80 val barrier = CyclicBarrier(3)
81
82 val d1 = async(pool) {
83 barrier.await()
84 delay(10_000)
85 yield()
86 }
87
88 val d2 = async(pool) {
89 barrier.await()
90 d1.cancel()
91 }
92
93 barrier.await()
94 try {
95 awaitAll(d1, d2)
96 } catch (e: JobCancellationException) {
97 cancelledOnce = true
98 }
99 }
100
101 require(cancelledOnce) { "Cancellation exception wasn't properly caught" }
102 }
Vsevolod Tolstopyatov05d38232018-04-12 20:16:37 +0300103
104 @Test
105 fun testMutatingCollection() = runTest {
106 val barrier = CyclicBarrier(4)
107
108 repeat(iterations) {
Roman Elizarov189e9952018-04-26 11:06:37 +0300109 // thread-safe collection that we are going to modify
110 val deferreds = CopyOnWriteArrayList<Deferred<Long>>()
Vsevolod Tolstopyatov05d38232018-04-12 20:16:37 +0300111
Roman Elizarov189e9952018-04-26 11:06:37 +0300112 deferreds += async(pool) {
Vsevolod Tolstopyatov05d38232018-04-12 20:16:37 +0300113 barrier.await()
114 1L
115 }
116
Roman Elizarov189e9952018-04-26 11:06:37 +0300117 deferreds += async(pool) {
Vsevolod Tolstopyatov05d38232018-04-12 20:16:37 +0300118 barrier.await()
119 2L
120 }
121
Roman Elizarov189e9952018-04-26 11:06:37 +0300122 deferreds += async(pool) {
Vsevolod Tolstopyatov05d38232018-04-12 20:16:37 +0300123 barrier.await()
Roman Elizarov189e9952018-04-26 11:06:37 +0300124 deferreds.removeAt(2)
125 3L
Vsevolod Tolstopyatov05d38232018-04-12 20:16:37 +0300126 }
127
Roman Elizarov189e9952018-04-26 11:06:37 +0300128 val allJobs = ArrayList(deferreds)
Vsevolod Tolstopyatov05d38232018-04-12 20:16:37 +0300129 barrier.await()
Roman Elizarov189e9952018-04-26 11:06:37 +0300130 val results = deferreds.awaitAll() // shouldn't hang
131 check(results == listOf(1L, 2L, 3L) || results == listOf(1L, 2L))
Vsevolod Tolstopyatov05d38232018-04-12 20:16:37 +0300132 allJobs.awaitAll()
133 barrier.reset()
134 }
135 }
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +0300136}