blob: 7d7b33c3632d15d253f93f66b9bd0e79319bf06d [file] [log] [blame]
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.experimental.channels
import kotlinx.coroutines.experimental.*
import org.junit.After
import org.junit.Test
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.experimental.*
class ConflatedChannelCloseStressTest : TestBase() {
val nSenders = 2
val testSeconds = 3 * stressTestMultiplier
val curChannel = AtomicReference<ConflatedChannel<Int>>(ConflatedChannel())
val sent = AtomicInteger()
val closed = AtomicInteger()
val received = AtomicInteger()
val pool = newFixedThreadPoolContext(nSenders + 2, "TestStressClose")
@After
fun tearDown() {
pool.close()
}
@Test
fun testStressClose() = runBlocking<Unit> {
println("--- ConflatedChannelCloseStressTest with nSenders=$nSenders")
val senderJobs = List(nSenders) { Job() }
val senders = List(nSenders) { senderId ->
launch(pool) {
var x = senderId
try {
while (isActive) {
try {
curChannel.get().offer(x)
x += nSenders
sent.incrementAndGet()
} catch (e: ClosedSendChannelException) {
// ignore
}
}
} finally {
senderJobs[senderId].cancel()
}
}
}
val closerJob = Job()
val closer = launch(pool) {
try {
while (isActive) {
flipChannel()
closed.incrementAndGet()
yield()
}
} finally {
closerJob.cancel()
}
}
val receiver = async(pool) {
while (isActive) {
curChannel.get().receiveOrNull()
received.incrementAndGet()
}
}
// print stats while running
repeat(testSeconds) {
delay(1000)
printStats()
}
println("Stopping")
senders.forEach { it.cancel() }
closer.cancel()
// wait them to complete
println("waiting for senders...")
senderJobs.forEach { it.join() }
println("waiting for closer...")
closerJob.join()
// close cur channel
println("Closing channel and signalling receiver...")
flipChannel()
curChannel.get().close(StopException())
/// wait for receiver do complete
println("Waiting for receiver...")
try {
receiver.await()
error("Receiver should not complete normally")
} catch (e: StopException) {
// ok
}
// print stats
println("--- done")
printStats()
}
private fun flipChannel() {
val oldChannel = curChannel.get()
val newChannel = ConflatedChannel<Int>()
curChannel.set(newChannel)
check(oldChannel.close())
}
private fun printStats() {
println("sent ${sent.get()}, closed ${closed.get()}, received ${received.get()}")
}
class StopException : Exception()
}