BroadcastChannelMultiReceiveStressTest streamlined,
consistent stdout headers in all stress-tests
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelMultiReceiveStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelMultiReceiveStressTest.kt
index 220afbe..4ac9535 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelMultiReceiveStressTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelMultiReceiveStressTest.kt
@@ -40,7 +40,7 @@
}
private val nReceivers = if (isStressTest) 10 else 5
- private val nSeconds = 5 * stressTestMultiplier
+ private val nSeconds = 3 * stressTestMultiplier
private val broadcast = kind.create<Long>()
private val pool = newFixedThreadPoolContext(nReceivers + 1, "BroadcastChannelMultiReceiveStressTest")
@@ -57,6 +57,7 @@
@Test
fun testStress() = runBlocking {
+ println("--- BroadcastChannelMultiReceiveStressTest $kind with nReceivers=$nReceivers")
val ctx = pool + coroutineContext[Job]!!
val sender =
launch(context = ctx + CoroutineName("Sender")) {
@@ -67,31 +68,35 @@
}
}
val receivers = mutableListOf<Job>()
- repeat(nSeconds) { sec ->
- // launch new receiver up to max
- if (receivers.size < nReceivers) {
- val receiverIndex = receivers.size
- val name = "Receiver$receiverIndex"
- println("$sec: Launching $name")
- receivers += launch(ctx + CoroutineName(name)) {
- broadcast.openSubscription().use { sub ->
- when (receiverIndex % 5) {
- 0 -> doReceive(sub, receiverIndex)
- 1 -> doReceiveOrNull(sub, receiverIndex)
- 2 -> doIterator(sub, receiverIndex)
- 3 -> doReceiveSelect(sub, receiverIndex)
- 4 -> doReceiveSelectOrNull(sub, receiverIndex)
- }
+ fun printProgress() {
+ println("Sent ${sentTotal.get()}, received ${receivedTotal.get()}, receivers=${receivers.size}")
+ }
+ // ramp up receivers
+ repeat(nReceivers) {
+ delay(100) // wait 0.1 sec
+ val receiverIndex = receivers.size
+ val name = "Receiver$receiverIndex"
+ println("Launching $name")
+ receivers += launch(ctx + CoroutineName(name)) {
+ broadcast.openSubscription().use { sub ->
+ when (receiverIndex % 5) {
+ 0 -> doReceive(sub, receiverIndex)
+ 1 -> doReceiveOrNull(sub, receiverIndex)
+ 2 -> doIterator(sub, receiverIndex)
+ 3 -> doReceiveSelect(sub, receiverIndex)
+ 4 -> doReceiveSelectOrNull(sub, receiverIndex)
}
}
}
- // wait a sec
- delay(100)
- // print progress
- println("${sec + 1}: Sent ${sentTotal.get()}, received ${receivedTotal.get()}, receivers=${receivers.size}")
+ printProgress()
+ }
+ // wait
+ repeat(nSeconds) { sec ->
+ delay(1000)
+ printProgress()
}
sender.cancelAndJoin()
- println("Tested with nReceivers=$nReceivers")
+ println("Tested $kind with nReceivers=$nReceivers")
val total = sentTotal.get()
println(" Sent $total events, waiting for receivers")
stopOnReceive.set(total)
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelSubStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelSubStressTest.kt
index 5a4a4df..45a1566 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelSubStressTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelSubStressTest.kt
@@ -47,6 +47,7 @@
@Test
fun testStress() = runBlocking {
+ println("--- BroadcastChannelSubStressTest $kind")
val ctx = coroutineContext + CommonPool
val sender =
launch(context = ctx + CoroutineName("Sender")) {
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt
index 334bf7f..71da1e4 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt
@@ -72,6 +72,7 @@
@Test
fun testAtomicCancelStress() = runBlocking<Unit> {
+ println("--- ChannelAtomicCancelStressTest $kind")
val deadline = System.currentTimeMillis() + TEST_DURATION
launchSender()
launchReceiver()
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt
index fad0437..98fc716 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt
@@ -59,8 +59,7 @@
@Test
fun testSendReceiveStress() = runBlocking {
- println("-------------------------------------")
- println("Testing $kind with nSenders=$nSenders, nReceivers=$nReceivers")
+ println("--- ChannelSendReceiveStressTest $kind with nSenders=$nSenders, nReceivers=$nReceivers")
val receivers = List(nReceivers) { receiverIndex ->
// different event receivers use different code
launch(CommonPool + CoroutineName("receiver$receiverIndex")) {
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelNotifyStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelNotifyStressTest.kt
index 030fc5e..bc5324e 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelNotifyStressTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelNotifyStressTest.kt
@@ -37,6 +37,7 @@
@Test
fun testStressNotify()= runBlocking<Unit> {
+ println("--- ConflatedBroadcastChannelNotifyStressTest")
val senders = List(nSenders) { senderId ->
launch(CommonPool + CoroutineName("Sender$senderId")) {
repeat(nEvents) { i ->
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelCloseStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelCloseStressTest.kt
index fbc55bc..dfa6f8d 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelCloseStressTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelCloseStressTest.kt
@@ -41,6 +41,7 @@
@Test
fun testStressClose() = runBlocking<Unit> {
+ println("--- ConflatedChannelCloseStressTest with nSenders=$nSenders")
val senderJobs = List(nSenders) { Job() }
val senders = List(nSenders) { senderId ->
launch(pool) {
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListLongStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListLongStressTest.kt
index dc5d931..ace0cfb 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListLongStressTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListLongStressTest.kt
@@ -43,6 +43,7 @@
@Test
fun testStress() {
+ println("--- LockFreeLinkedListLongStressTest")
for (j in 0 until nAddThreads)
threads += thread(start = false, name = "adder-$j") {
for (i in j until nAdded step nAddThreads) {
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListShortStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListShortStressTest.kt
index 5a7376f..3680ab2 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListShortStressTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListShortStressTest.kt
@@ -46,6 +46,7 @@
@Test
fun testStress() {
+ println("--- LockFreeLinkedListShortStressTest")
val deadline = System.currentTimeMillis() + TEST_DURATION
repeat(nAdderThreads) { threadId ->
threads += thread(start = false, name = "adder-$threadId") {
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectPhilosophersStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectPhilosophersStressTest.kt
index 714ede4..05892d6 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectPhilosophersStressTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectPhilosophersStressTest.kt
@@ -49,6 +49,7 @@
@Test
fun testPhilosophers() = runBlocking<Unit> {
+ println("--- SelectPhilosophersStressTest")
val timeLimit = System.currentTimeMillis() + TEST_DURATION
val philosophers = List<Deferred<Int>>(n) { id ->
async(CommonPool) {