blob: 4ac9535c5ff1ae3998fbbf05bb6aeb2623b5361e [file] [log] [blame]
Roman Elizarov43918972017-10-07 21:00:06 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kotlinx.coroutines.experimental.channels
18
19import kotlinx.coroutines.experimental.*
20import kotlinx.coroutines.experimental.selects.select
21import org.junit.After
22import org.junit.Test
23import org.junit.runner.RunWith
24import org.junit.runners.Parameterized
25import java.util.concurrent.TimeUnit
Roman Elizarov45c85652017-10-13 14:46:22 +030026import java.util.concurrent.atomic.AtomicLong
Roman Elizarov43918972017-10-07 21:00:06 +030027
28/**
29 * Tests delivery of events to multiple broadcast channel subscribers.
30 */
31@RunWith(Parameterized::class)
32class BroadcastChannelMultiReceiveStressTest(
33 val kind: TestBroadcastChannelKind
34) : TestBase() {
35 companion object {
36 @Parameterized.Parameters(name = "{0}")
37 @JvmStatic
38 fun params(): Collection<Array<Any>> =
39 TestBroadcastChannelKind.values().map { arrayOf<Any>(it) }
40 }
41
42 private val nReceivers = if (isStressTest) 10 else 5
Roman Elizarovd3d335b2017-10-21 17:43:53 +030043 private val nSeconds = 3 * stressTestMultiplier
Roman Elizarov43918972017-10-07 21:00:06 +030044
Roman Elizarov45c85652017-10-13 14:46:22 +030045 private val broadcast = kind.create<Long>()
Roman Elizarov43918972017-10-07 21:00:06 +030046 private val pool = newFixedThreadPoolContext(nReceivers + 1, "BroadcastChannelMultiReceiveStressTest")
47
Roman Elizarov45c85652017-10-13 14:46:22 +030048 private val sentTotal = AtomicLong()
49 private val receivedTotal = AtomicLong()
50 private val stopOnReceive = AtomicLong(-1)
51 private val lastReceived = Array(nReceivers) { AtomicLong(-1) }
Roman Elizarov43918972017-10-07 21:00:06 +030052
53 @After
54 fun tearDown() {
Roman Elizarovd9ae2bc2017-10-20 17:36:56 +080055 pool.close()
Roman Elizarov43918972017-10-07 21:00:06 +030056 }
57
58 @Test
59 fun testStress() = runBlocking {
Roman Elizarovd3d335b2017-10-21 17:43:53 +030060 println("--- BroadcastChannelMultiReceiveStressTest $kind with nReceivers=$nReceivers")
Roman Elizarov43918972017-10-07 21:00:06 +030061 val ctx = pool + coroutineContext[Job]!!
62 val sender =
63 launch(context = ctx + CoroutineName("Sender")) {
Roman Elizarov45c85652017-10-13 14:46:22 +030064 var i = 0L
Roman Elizarov43918972017-10-07 21:00:06 +030065 while (isActive) {
Roman Elizarovfdd6db52017-10-07 23:01:20 +030066 broadcast.send(++i)
67 sentTotal.set(i) // set sentTotal only if `send` was not cancelled
Roman Elizarov43918972017-10-07 21:00:06 +030068 }
69 }
70 val receivers = mutableListOf<Job>()
Roman Elizarovd3d335b2017-10-21 17:43:53 +030071 fun printProgress() {
72 println("Sent ${sentTotal.get()}, received ${receivedTotal.get()}, receivers=${receivers.size}")
73 }
74 // ramp up receivers
75 repeat(nReceivers) {
76 delay(100) // wait 0.1 sec
77 val receiverIndex = receivers.size
78 val name = "Receiver$receiverIndex"
79 println("Launching $name")
80 receivers += launch(ctx + CoroutineName(name)) {
81 broadcast.openSubscription().use { sub ->
82 when (receiverIndex % 5) {
83 0 -> doReceive(sub, receiverIndex)
84 1 -> doReceiveOrNull(sub, receiverIndex)
85 2 -> doIterator(sub, receiverIndex)
86 3 -> doReceiveSelect(sub, receiverIndex)
87 4 -> doReceiveSelectOrNull(sub, receiverIndex)
Roman Elizarov43918972017-10-07 21:00:06 +030088 }
89 }
90 }
Roman Elizarovd3d335b2017-10-21 17:43:53 +030091 printProgress()
92 }
93 // wait
94 repeat(nSeconds) { sec ->
95 delay(1000)
96 printProgress()
Roman Elizarov43918972017-10-07 21:00:06 +030097 }
98 sender.cancelAndJoin()
Roman Elizarovd3d335b2017-10-21 17:43:53 +030099 println("Tested $kind with nReceivers=$nReceivers")
Roman Elizarov43918972017-10-07 21:00:06 +0300100 val total = sentTotal.get()
101 println(" Sent $total events, waiting for receivers")
102 stopOnReceive.set(total)
Roman Elizarovfdd6db52017-10-07 23:01:20 +0300103 try {
104 withTimeout(5, TimeUnit.SECONDS) {
105 receivers.forEachIndexed { index, receiver ->
106 if (lastReceived[index].get() == total)
107 receiver.cancel()
108 else
109 receiver.join()
110 }
Roman Elizarov43918972017-10-07 21:00:06 +0300111 }
Roman Elizarovfdd6db52017-10-07 23:01:20 +0300112 } catch (e: Exception) {
113 println("Failed: $e")
114 receivers.indices.forEach { index ->
115 println("lastReceived[$index] = ${lastReceived[index].get()}")
116 }
117 throw e
Roman Elizarov43918972017-10-07 21:00:06 +0300118 }
119 println(" Received ${receivedTotal.get()} events")
120 }
121
Roman Elizarov45c85652017-10-13 14:46:22 +0300122 private fun doReceived(receiverIndex: Int, i: Long): Boolean {
Roman Elizarov43918972017-10-07 21:00:06 +0300123 val last = lastReceived[receiverIndex].get()
124 check(i > last) { "Last was $last, got $i" }
Roman Elizarov45c85652017-10-13 14:46:22 +0300125 if (last != -1L && !kind.isConflated)
Roman Elizarov43918972017-10-07 21:00:06 +0300126 check(i == last + 1) { "Last was $last, got $i" }
127 receivedTotal.incrementAndGet()
128 lastReceived[receiverIndex].set(i)
129 return i == stopOnReceive.get()
130 }
131
Roman Elizarov45c85652017-10-13 14:46:22 +0300132 private suspend fun doReceive(channel: ReceiveChannel<Long>, receiverIndex: Int) {
Roman Elizarov43918972017-10-07 21:00:06 +0300133 while (true) {
134 try {
135 val stop = doReceived(receiverIndex, channel.receive())
136 if (stop) break
137 }
138 catch (ex: ClosedReceiveChannelException) { break }
139 }
140 }
141
Roman Elizarov45c85652017-10-13 14:46:22 +0300142 private suspend fun doReceiveOrNull(channel: ReceiveChannel<Long>, receiverIndex: Int) {
Roman Elizarov43918972017-10-07 21:00:06 +0300143 while (true) {
144 val stop = doReceived(receiverIndex, channel.receiveOrNull() ?: break)
145 if (stop) break
146 }
147 }
148
Roman Elizarov45c85652017-10-13 14:46:22 +0300149 private suspend fun doIterator(channel: ReceiveChannel<Long>, receiverIndex: Int) {
Roman Elizarov43918972017-10-07 21:00:06 +0300150 for (event in channel) {
151 val stop = doReceived(receiverIndex, event)
152 if (stop) break
153 }
154 }
155
Roman Elizarov45c85652017-10-13 14:46:22 +0300156 private suspend fun doReceiveSelect(channel: ReceiveChannel<Long>, receiverIndex: Int) {
Roman Elizarov43918972017-10-07 21:00:06 +0300157 while (true) {
158 try {
Roman Elizarov45c85652017-10-13 14:46:22 +0300159 val event = select<Long> { channel.onReceive { it } }
Roman Elizarov43918972017-10-07 21:00:06 +0300160 val stop = doReceived(receiverIndex, event)
161 if (stop) break
162 } catch (ex: ClosedReceiveChannelException) { break }
163 }
164 }
165
Roman Elizarov45c85652017-10-13 14:46:22 +0300166 private suspend fun doReceiveSelectOrNull(channel: ReceiveChannel<Long>, receiverIndex: Int) {
Roman Elizarov43918972017-10-07 21:00:06 +0300167 while (true) {
Roman Elizarov45c85652017-10-13 14:46:22 +0300168 val event = select<Long?> { channel.onReceiveOrNull { it } } ?: break
Roman Elizarov43918972017-10-07 21:00:06 +0300169 val stop = doReceived(receiverIndex, event)
170 if (stop) break
171 }
172 }
173}