Throttle fast senders in ChannelSendReceiveStressTest to prevent OOM with LinkedListChannel stress test
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt
index be625ef..44bfe1d 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt
@@ -46,10 +46,13 @@
val timeLimit = 30_000L * stressTestMultiplier // 30 sec
val nEvents = 1_000_000 * stressTestMultiplier
+ val maxBuffer = 10_000 // artifical limit for LinkedListChannel
+
val channel = kind.create()
val sendersCompleted = AtomicInteger()
val receiversCompleted = AtomicInteger()
val dupes = AtomicInteger()
+ val sentTotal = AtomicInteger()
val received = AtomicIntegerArray(nEvents)
val receivedTotal = AtomicInteger()
val receivedBy = IntArray(nReceivers)
@@ -90,6 +93,7 @@
println("Tested $kind with nSenders=$nSenders, nReceivers=$nReceivers")
println("Completed successfully ${sendersCompleted.get()} sender coroutines")
println("Completed successfully ${receiversCompleted.get()} receiver coroutines")
+ println(" Sent ${sentTotal.get()} events")
println(" Received ${receivedTotal.get()} events")
println(" Received dupes ${dupes.get()}")
repeat(nReceivers) { receiveIndex ->
@@ -98,20 +102,31 @@
assertEquals(nSenders, sendersCompleted.get())
assertEquals(nReceivers, receiversCompleted.get())
assertEquals(0, dupes.get())
+ assertEquals(nEvents, sentTotal.get())
assertEquals(nEvents, receivedTotal.get())
repeat(nReceivers) { receiveIndex ->
assertTrue("Each receiver should have received something", receivedBy[receiveIndex] > 0)
}
}
+ private suspend fun doSent() {
+ sentTotal.incrementAndGet()
+ while (sentTotal.get() > receivedTotal.get() + maxBuffer)
+ yield() // throttle fast senders to prevent OOM with LinkedListChannel
+ }
+
private suspend fun doSend(senderIndex: Int) {
- for (i in senderIndex until nEvents step nSenders)
+ for (i in senderIndex until nEvents step nSenders) {
channel.send(i)
+ doSent()
+ }
}
private suspend fun doSendSelect(senderIndex: Int) {
- for (i in senderIndex until nEvents step nSenders)
+ for (i in senderIndex until nEvents step nSenders) {
select<Unit> { channel.onSend(i) { Unit } }
+ doSent()
+ }
}
private fun doReceived(receiverIndex: Int, event: Int) {