blob: b3f257fde0df11195fe0f42498430b792da99bcf [file] [log] [blame]
Roman Elizarovdb0d4fc2018-01-29 10:50:20 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kotlinx.coroutines.experimental
18
19import kotlinx.atomicfu.*
20import java.util.*
21import java.util.concurrent.*
22import kotlin.concurrent.*
23import kotlin.test.*
24
25class JobHandlersUpgradeStressTest : TestBase() {
26 private val nSeconds = 3 * stressTestMultiplier
27 private val nThreads = 4
28
29 private val cyclicBarrier = CyclicBarrier(1 + nThreads)
30 private val threads = mutableListOf<Thread>()
31
32 private val inters = atomic(0)
33 private val removed = atomic(0)
34 private val fired = atomic(0)
35
36 private val sink = atomic(0)
37
38 @Volatile
39 private var done = false
40
41 @Volatile
42 private var job: Job? = null
43
44 class State {
45 val state = atomic(0)
46 }
47
48 @Test
49 fun testStress() {
50 println("--- JobHandlersUpgradeStressTest")
51 threads += thread(name = "creator", start = false) {
52 val rnd = Random()
53 while (true) {
54 job = if (done) null else Job()
55 cyclicBarrier.await()
56 val job = job ?: break
57 // burn some time
Roman Elizarov0aa6db02018-01-29 13:18:38 +030058 repeat(rnd.nextInt(3000)) { sink.incrementAndGet() }
Roman Elizarovdb0d4fc2018-01-29 10:50:20 +030059 // cancel job
60 job.cancel()
61 cyclicBarrier.await()
62 inters.incrementAndGet()
63 }
64 }
65 threads += List(nThreads) { threadId ->
66 thread(name = "handler-$threadId", start = false) {
67 val rnd = Random()
68 while (true) {
69 val onCancelling = rnd.nextBoolean()
70 val invokeImmediately: Boolean = rnd.nextBoolean()
71 cyclicBarrier.await()
72 val job = job ?: break
73 val state = State()
74 // burn some time
Roman Elizarov0aa6db02018-01-29 13:18:38 +030075 repeat(rnd.nextInt(1000)) { sink.incrementAndGet() }
Roman Elizarovdb0d4fc2018-01-29 10:50:20 +030076 val handle =
77 job.invokeOnCompletion(onCancelling = onCancelling, invokeImmediately = invokeImmediately) {
78 if (!state.state.compareAndSet(0, 1))
79 error("Fired more than once or too late: state=${state.state.value}")
80 }
81 // burn some time
Roman Elizarov0aa6db02018-01-29 13:18:38 +030082 repeat(rnd.nextInt(1000)) { sink.incrementAndGet() }
Roman Elizarovdb0d4fc2018-01-29 10:50:20 +030083 // dispose
84 handle.dispose()
85 cyclicBarrier.await()
86 val resultingState = state.state.value
87 when (resultingState) {
88 0 -> removed.incrementAndGet()
89 1 -> fired.incrementAndGet()
90 else -> error("Cannot happen")
91 }
92 if (!state.state.compareAndSet(resultingState, 2))
93 error("Cannot fire late: resultingState=$resultingState")
94 }
95 }
96 }
97 threads.forEach { it.start() }
98 repeat(nSeconds) { second ->
99 Thread.sleep(1000)
100 println("${second + 1}: ${inters.value} iterations")
101 }
102 done = true
103 threads.forEach { it.join() }
104 println(" Completed ${inters.value} iterations")
105 println(" Removed handler ${removed.value} times")
106 println(" Fired handler ${fired.value} times")
107
108 }
109}