blob: db60f4cee683116ad09742f654e6459431c60d8c [file] [log] [blame]
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.experimental.channels
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.selects.select
import org.junit.Assert.assertEquals
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import java.util.*
import java.util.concurrent.atomic.AtomicReference
/**
* Tests cancel atomicity for channel send & receive operations, including their select versions.
*/
@RunWith(Parameterized::class)
class ChannelAtomicCancelStressTest(val kind: TestChannelKind) : TestBase() {
companion object {
@Parameterized.Parameters(name = "{0}")
@JvmStatic
fun params(): Collection<Array<Any>> = TestChannelKind.values().map { arrayOf<Any>(it) }
}
val TEST_DURATION = 3000L * stressTestMultiplier
val channel = kind.create()
val senderDone = ArrayChannel<Boolean>(1)
val receiverDone = ArrayChannel<Boolean>(1)
var lastSent = 0
var lastReceived = 0
var stoppedSender = 0
var stoppedReceiver = 0
var missedCnt = 0
var dupCnt = 0
val failed = AtomicReference<Throwable>()
lateinit var sender: Job
lateinit var receiver: Job
fun fail(e: Throwable) = failed.compareAndSet(null, e)
inline fun cancellable(done: ArrayChannel<Boolean>, block: () -> Unit) {
try {
block()
} catch (e: Throwable) {
if (e !is CancellationException) fail(e)
throw e
} finally {
if (!done.offer(true))
fail(IllegalStateException("failed to offer to done channel"))
}
}
@Test
fun testAtomicCancelStress() = runBlocking<Unit> {
println("--- ChannelAtomicCancelStressTest $kind")
val deadline = System.currentTimeMillis() + TEST_DURATION
launchSender()
launchReceiver()
val rnd = Random()
while (System.currentTimeMillis() < deadline && failed.get() == null) {
when (rnd.nextInt(3)) {
0 -> { // cancel & restart sender
stopSender()
launchSender()
}
1 -> { // cancel & restart receiver
stopReceiver()
launchReceiver()
}
2 -> yield() // just yield (burn a little time)
}
}
stopSender()
stopReceiver()
println(" Sent $lastSent ints to channel")
println(" Received $lastReceived ints from channel")
println(" Stopped sender $stoppedSender times")
println("Stopped receiver $stoppedReceiver times")
println(" Missed $missedCnt ints")
println(" Duplicated $dupCnt ints")
failed.get()?.let { throw it }
assertEquals(0, dupCnt)
if (!kind.isConflated) {
assertEquals(0, missedCnt)
assertEquals(lastSent, lastReceived)
}
}
fun launchSender() {
sender = launch(CommonPool, start = CoroutineStart.ATOMIC) {
val rnd = Random()
cancellable(senderDone) {
var counter = 0
while (true) {
val trySend = lastSent + 1
when (rnd.nextInt(2)) {
0 -> channel.send(trySend)
1 -> select { channel.onSend(trySend) {} }
else -> error("cannot happen")
}
lastSent = trySend // update on success
if (counter++ % 1000 == 0) yield() // yield periodically to check cancellation on LinkedListChannel
}
}
}
}
suspend fun stopSender() {
stoppedSender++
sender.cancel()
senderDone.receive()
}
fun launchReceiver() {
receiver = launch(CommonPool, start = CoroutineStart.ATOMIC) {
val rnd = Random()
cancellable(receiverDone) {
while (true) {
val received = when (rnd.nextInt(2)) {
0 -> channel.receive()
1 -> select { channel.onReceive { it } }
else -> error("cannot happen")
}
val expected = lastReceived + 1
if (received > expected)
missedCnt++
if (received < expected)
dupCnt++
lastReceived = received
}
}
}
}
suspend fun stopReceiver() {
stoppedReceiver++
receiver.cancel()
receiverDone.receive()
}
}