| /* |
| * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
| */ |
| |
| package kotlinx.coroutines |
| |
| import kotlinx.atomicfu.* |
| import java.util.* |
| import java.util.concurrent.* |
| import kotlin.concurrent.* |
| import kotlin.test.* |
| |
| class JobHandlersUpgradeStressTest : TestBase() { |
| private val nSeconds = 3 * stressTestMultiplier |
| private val nThreads = 4 |
| |
| private val cyclicBarrier = CyclicBarrier(1 + nThreads) |
| private val threads = mutableListOf<Thread>() |
| |
| private val inters = atomic(0) |
| private val removed = atomic(0) |
| private val fired = atomic(0) |
| |
| private val sink = atomic(0) |
| |
| @Volatile |
| private var done = false |
| |
| @Volatile |
| private var job: Job? = null |
| |
| class State { |
| val state = atomic(0) |
| } |
| |
| @Test |
| fun testStress() { |
| println("--- JobHandlersUpgradeStressTest") |
| threads += thread(name = "creator", start = false) { |
| val rnd = Random() |
| while (true) { |
| job = if (done) null else Job() |
| cyclicBarrier.await() |
| val job = job ?: break |
| // burn some time |
| repeat(rnd.nextInt(3000)) { sink.incrementAndGet() } |
| // cancel job |
| job.cancel() |
| cyclicBarrier.await() |
| inters.incrementAndGet() |
| } |
| } |
| threads += List(nThreads) { threadId -> |
| thread(name = "handler-$threadId", start = false) { |
| val rnd = Random() |
| while (true) { |
| val onCancelling = rnd.nextBoolean() |
| val invokeImmediately: Boolean = rnd.nextBoolean() |
| cyclicBarrier.await() |
| val job = job ?: break |
| val state = State() |
| // burn some time |
| repeat(rnd.nextInt(1000)) { sink.incrementAndGet() } |
| val handle = |
| job.invokeOnCompletion(onCancelling = onCancelling, invokeImmediately = invokeImmediately) { |
| if (!state.state.compareAndSet(0, 1)) |
| error("Fired more than once or too late: state=${state.state.value}") |
| } |
| // burn some time |
| repeat(rnd.nextInt(1000)) { sink.incrementAndGet() } |
| // dispose |
| handle.dispose() |
| cyclicBarrier.await() |
| val resultingState = state.state.value |
| when (resultingState) { |
| 0 -> removed.incrementAndGet() |
| 1 -> fired.incrementAndGet() |
| else -> error("Cannot happen") |
| } |
| if (!state.state.compareAndSet(resultingState, 2)) |
| error("Cannot fire late: resultingState=$resultingState") |
| } |
| } |
| } |
| threads.forEach { it.start() } |
| repeat(nSeconds) { second -> |
| Thread.sleep(1000) |
| println("${second + 1}: ${inters.value} iterations") |
| } |
| done = true |
| threads.forEach { it.join() } |
| println(" Completed ${inters.value} iterations") |
| println(" Removed handler ${removed.value} times") |
| println(" Fired handler ${fired.value} times") |
| |
| } |
| } |