blob: 54ba7b639f78ec91b981aabbd0b8a78aded1005e [file] [log] [blame]
Roman Elizarov43918972017-10-07 21:00:06 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov43918972017-10-07 21:00:06 +03003 */
4
Roman Elizarov0950dfa2018-07-13 10:33:25 +03005package kotlinx.coroutines.channels
Roman Elizarov43918972017-10-07 21:00:06 +03006
Roman Elizarov0950dfa2018-07-13 10:33:25 +03007import kotlinx.coroutines.*
8import kotlinx.coroutines.selects.*
Roman Elizarov9fe5f462018-02-21 19:05:52 +03009import org.junit.*
10import org.junit.runner.*
11import org.junit.runners.*
Roman Elizarov9fe5f462018-02-21 19:05:52 +030012import java.util.concurrent.atomic.*
Roman Elizarov43918972017-10-07 21:00:06 +030013
14/**
15 * Tests delivery of events to multiple broadcast channel subscribers.
16 */
17@RunWith(Parameterized::class)
18class BroadcastChannelMultiReceiveStressTest(
Vsevolod Tolstopyatova2d80882018-09-24 19:51:49 +030019 private val kind: TestBroadcastChannelKind
Roman Elizarov43918972017-10-07 21:00:06 +030020) : TestBase() {
21 companion object {
22 @Parameterized.Parameters(name = "{0}")
23 @JvmStatic
24 fun params(): Collection<Array<Any>> =
25 TestBroadcastChannelKind.values().map { arrayOf<Any>(it) }
26 }
27
28 private val nReceivers = if (isStressTest) 10 else 5
Roman Elizarovd3d335b2017-10-21 17:43:53 +030029 private val nSeconds = 3 * stressTestMultiplier
Roman Elizarov43918972017-10-07 21:00:06 +030030
Roman Elizarov45c85652017-10-13 14:46:22 +030031 private val broadcast = kind.create<Long>()
Roman Elizarov43918972017-10-07 21:00:06 +030032 private val pool = newFixedThreadPoolContext(nReceivers + 1, "BroadcastChannelMultiReceiveStressTest")
33
Roman Elizarov45c85652017-10-13 14:46:22 +030034 private val sentTotal = AtomicLong()
35 private val receivedTotal = AtomicLong()
36 private val stopOnReceive = AtomicLong(-1)
37 private val lastReceived = Array(nReceivers) { AtomicLong(-1) }
Roman Elizarov43918972017-10-07 21:00:06 +030038
39 @After
40 fun tearDown() {
Roman Elizarovd9ae2bc2017-10-20 17:36:56 +080041 pool.close()
Roman Elizarov43918972017-10-07 21:00:06 +030042 }
43
44 @Test
45 fun testStress() = runBlocking {
Roman Elizarovd3d335b2017-10-21 17:43:53 +030046 println("--- BroadcastChannelMultiReceiveStressTest $kind with nReceivers=$nReceivers")
Roman Elizarov43918972017-10-07 21:00:06 +030047 val sender =
Roman Elizarov6e3ffb12018-09-14 13:46:58 +030048 launch(pool + CoroutineName("Sender")) {
Roman Elizarov45c85652017-10-13 14:46:22 +030049 var i = 0L
Roman Elizarov43918972017-10-07 21:00:06 +030050 while (isActive) {
Roman Elizarovfdd6db52017-10-07 23:01:20 +030051 broadcast.send(++i)
52 sentTotal.set(i) // set sentTotal only if `send` was not cancelled
Roman Elizarov43918972017-10-07 21:00:06 +030053 }
54 }
55 val receivers = mutableListOf<Job>()
Roman Elizarovd3d335b2017-10-21 17:43:53 +030056 fun printProgress() {
57 println("Sent ${sentTotal.get()}, received ${receivedTotal.get()}, receivers=${receivers.size}")
58 }
59 // ramp up receivers
60 repeat(nReceivers) {
61 delay(100) // wait 0.1 sec
62 val receiverIndex = receivers.size
63 val name = "Receiver$receiverIndex"
64 println("Launching $name")
Roman Elizarov6e3ffb12018-09-14 13:46:58 +030065 receivers += launch(pool + CoroutineName(name)) {
Marko Devcic1d6230a2018-04-04 20:13:08 +020066 val channel = broadcast.openSubscription()
Roman Elizarovd3d335b2017-10-21 17:43:53 +030067 when (receiverIndex % 5) {
Marko Devcic1d6230a2018-04-04 20:13:08 +020068 0 -> doReceive(channel, receiverIndex)
69 1 -> doReceiveOrNull(channel, receiverIndex)
70 2 -> doIterator(channel, receiverIndex)
71 3 -> doReceiveSelect(channel, receiverIndex)
72 4 -> doReceiveSelectOrNull(channel, receiverIndex)
Roman Elizarov43918972017-10-07 21:00:06 +030073 }
Marko Devcic1d6230a2018-04-04 20:13:08 +020074 channel.cancel()
Roman Elizarov43918972017-10-07 21:00:06 +030075 }
Roman Elizarovd3d335b2017-10-21 17:43:53 +030076 printProgress()
77 }
78 // wait
Vsevolod Tolstopyatov87f2faa2018-04-30 22:53:02 +030079 repeat(nSeconds) { _ ->
Roman Elizarovd3d335b2017-10-21 17:43:53 +030080 delay(1000)
81 printProgress()
Roman Elizarov43918972017-10-07 21:00:06 +030082 }
83 sender.cancelAndJoin()
Roman Elizarovd3d335b2017-10-21 17:43:53 +030084 println("Tested $kind with nReceivers=$nReceivers")
Roman Elizarov43918972017-10-07 21:00:06 +030085 val total = sentTotal.get()
86 println(" Sent $total events, waiting for receivers")
87 stopOnReceive.set(total)
Roman Elizarovfdd6db52017-10-07 23:01:20 +030088 try {
Roman Elizarov27b8f452018-09-20 21:23:41 +030089 withTimeout(5000) {
Roman Elizarovfdd6db52017-10-07 23:01:20 +030090 receivers.forEachIndexed { index, receiver ->
91 if (lastReceived[index].get() == total)
92 receiver.cancel()
93 else
94 receiver.join()
95 }
Roman Elizarov43918972017-10-07 21:00:06 +030096 }
Roman Elizarovfdd6db52017-10-07 23:01:20 +030097 } catch (e: Exception) {
98 println("Failed: $e")
Roman Elizarovffc61ae2017-10-26 19:29:52 +030099 pool.dumpThreads("Threads in pool")
Roman Elizarovfdd6db52017-10-07 23:01:20 +0300100 receivers.indices.forEach { index ->
101 println("lastReceived[$index] = ${lastReceived[index].get()}")
102 }
103 throw e
Roman Elizarov43918972017-10-07 21:00:06 +0300104 }
105 println(" Received ${receivedTotal.get()} events")
106 }
107
Roman Elizarov45c85652017-10-13 14:46:22 +0300108 private fun doReceived(receiverIndex: Int, i: Long): Boolean {
Roman Elizarov43918972017-10-07 21:00:06 +0300109 val last = lastReceived[receiverIndex].get()
110 check(i > last) { "Last was $last, got $i" }
Roman Elizarov45c85652017-10-13 14:46:22 +0300111 if (last != -1L && !kind.isConflated)
Roman Elizarov43918972017-10-07 21:00:06 +0300112 check(i == last + 1) { "Last was $last, got $i" }
113 receivedTotal.incrementAndGet()
114 lastReceived[receiverIndex].set(i)
115 return i == stopOnReceive.get()
116 }
117
Roman Elizarov45c85652017-10-13 14:46:22 +0300118 private suspend fun doReceive(channel: ReceiveChannel<Long>, receiverIndex: Int) {
Roman Elizarov43918972017-10-07 21:00:06 +0300119 while (true) {
120 try {
121 val stop = doReceived(receiverIndex, channel.receive())
122 if (stop) break
123 }
124 catch (ex: ClosedReceiveChannelException) { break }
125 }
126 }
127
Roman Elizarov45c85652017-10-13 14:46:22 +0300128 private suspend fun doReceiveOrNull(channel: ReceiveChannel<Long>, receiverIndex: Int) {
Roman Elizarov43918972017-10-07 21:00:06 +0300129 while (true) {
130 val stop = doReceived(receiverIndex, channel.receiveOrNull() ?: break)
131 if (stop) break
132 }
133 }
134
Roman Elizarov45c85652017-10-13 14:46:22 +0300135 private suspend fun doIterator(channel: ReceiveChannel<Long>, receiverIndex: Int) {
Roman Elizarov43918972017-10-07 21:00:06 +0300136 for (event in channel) {
137 val stop = doReceived(receiverIndex, event)
138 if (stop) break
139 }
140 }
141
Roman Elizarov45c85652017-10-13 14:46:22 +0300142 private suspend fun doReceiveSelect(channel: ReceiveChannel<Long>, receiverIndex: Int) {
Roman Elizarov43918972017-10-07 21:00:06 +0300143 while (true) {
144 try {
Roman Elizarov45c85652017-10-13 14:46:22 +0300145 val event = select<Long> { channel.onReceive { it } }
Roman Elizarov43918972017-10-07 21:00:06 +0300146 val stop = doReceived(receiverIndex, event)
147 if (stop) break
148 } catch (ex: ClosedReceiveChannelException) { break }
149 }
150 }
151
Roman Elizarov45c85652017-10-13 14:46:22 +0300152 private suspend fun doReceiveSelectOrNull(channel: ReceiveChannel<Long>, receiverIndex: Int) {
Roman Elizarov43918972017-10-07 21:00:06 +0300153 while (true) {
Roman Elizarov45c85652017-10-13 14:46:22 +0300154 val event = select<Long?> { channel.onReceiveOrNull { it } } ?: break
Roman Elizarov43918972017-10-07 21:00:06 +0300155 val stop = doReceived(receiverIndex, event)
156 if (stop) break
157 }
158 }
159}