blob: 220afbe9cea9254d26caf47166ffefd516e3227e [file] [log] [blame]
Roman Elizarov43918972017-10-07 21:00:06 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kotlinx.coroutines.experimental.channels
18
19import kotlinx.coroutines.experimental.*
20import kotlinx.coroutines.experimental.selects.select
21import org.junit.After
22import org.junit.Test
23import org.junit.runner.RunWith
24import org.junit.runners.Parameterized
25import java.util.concurrent.TimeUnit
Roman Elizarov45c85652017-10-13 14:46:22 +030026import java.util.concurrent.atomic.AtomicLong
Roman Elizarov43918972017-10-07 21:00:06 +030027
28/**
29 * Tests delivery of events to multiple broadcast channel subscribers.
30 */
31@RunWith(Parameterized::class)
32class BroadcastChannelMultiReceiveStressTest(
33 val kind: TestBroadcastChannelKind
34) : TestBase() {
35 companion object {
36 @Parameterized.Parameters(name = "{0}")
37 @JvmStatic
38 fun params(): Collection<Array<Any>> =
39 TestBroadcastChannelKind.values().map { arrayOf<Any>(it) }
40 }
41
42 private val nReceivers = if (isStressTest) 10 else 5
43 private val nSeconds = 5 * stressTestMultiplier
44
Roman Elizarov45c85652017-10-13 14:46:22 +030045 private val broadcast = kind.create<Long>()
Roman Elizarov43918972017-10-07 21:00:06 +030046 private val pool = newFixedThreadPoolContext(nReceivers + 1, "BroadcastChannelMultiReceiveStressTest")
47
Roman Elizarov45c85652017-10-13 14:46:22 +030048 private val sentTotal = AtomicLong()
49 private val receivedTotal = AtomicLong()
50 private val stopOnReceive = AtomicLong(-1)
51 private val lastReceived = Array(nReceivers) { AtomicLong(-1) }
Roman Elizarov43918972017-10-07 21:00:06 +030052
53 @After
54 fun tearDown() {
Roman Elizarovd9ae2bc2017-10-20 17:36:56 +080055 pool.close()
Roman Elizarov43918972017-10-07 21:00:06 +030056 }
57
58 @Test
59 fun testStress() = runBlocking {
60 val ctx = pool + coroutineContext[Job]!!
61 val sender =
62 launch(context = ctx + CoroutineName("Sender")) {
Roman Elizarov45c85652017-10-13 14:46:22 +030063 var i = 0L
Roman Elizarov43918972017-10-07 21:00:06 +030064 while (isActive) {
Roman Elizarovfdd6db52017-10-07 23:01:20 +030065 broadcast.send(++i)
66 sentTotal.set(i) // set sentTotal only if `send` was not cancelled
Roman Elizarov43918972017-10-07 21:00:06 +030067 }
68 }
69 val receivers = mutableListOf<Job>()
70 repeat(nSeconds) { sec ->
71 // launch new receiver up to max
72 if (receivers.size < nReceivers) {
73 val receiverIndex = receivers.size
74 val name = "Receiver$receiverIndex"
75 println("$sec: Launching $name")
76 receivers += launch(ctx + CoroutineName(name)) {
77 broadcast.openSubscription().use { sub ->
78 when (receiverIndex % 5) {
79 0 -> doReceive(sub, receiverIndex)
80 1 -> doReceiveOrNull(sub, receiverIndex)
81 2 -> doIterator(sub, receiverIndex)
82 3 -> doReceiveSelect(sub, receiverIndex)
83 4 -> doReceiveSelectOrNull(sub, receiverIndex)
84 }
85 }
86 }
87 }
88 // wait a sec
Roman Elizarovfdd6db52017-10-07 23:01:20 +030089 delay(100)
Roman Elizarov43918972017-10-07 21:00:06 +030090 // print progress
91 println("${sec + 1}: Sent ${sentTotal.get()}, received ${receivedTotal.get()}, receivers=${receivers.size}")
92 }
93 sender.cancelAndJoin()
94 println("Tested with nReceivers=$nReceivers")
95 val total = sentTotal.get()
96 println(" Sent $total events, waiting for receivers")
97 stopOnReceive.set(total)
Roman Elizarovfdd6db52017-10-07 23:01:20 +030098 try {
99 withTimeout(5, TimeUnit.SECONDS) {
100 receivers.forEachIndexed { index, receiver ->
101 if (lastReceived[index].get() == total)
102 receiver.cancel()
103 else
104 receiver.join()
105 }
Roman Elizarov43918972017-10-07 21:00:06 +0300106 }
Roman Elizarovfdd6db52017-10-07 23:01:20 +0300107 } catch (e: Exception) {
108 println("Failed: $e")
109 receivers.indices.forEach { index ->
110 println("lastReceived[$index] = ${lastReceived[index].get()}")
111 }
112 throw e
Roman Elizarov43918972017-10-07 21:00:06 +0300113 }
114 println(" Received ${receivedTotal.get()} events")
115 }
116
Roman Elizarov45c85652017-10-13 14:46:22 +0300117 private fun doReceived(receiverIndex: Int, i: Long): Boolean {
Roman Elizarov43918972017-10-07 21:00:06 +0300118 val last = lastReceived[receiverIndex].get()
119 check(i > last) { "Last was $last, got $i" }
Roman Elizarov45c85652017-10-13 14:46:22 +0300120 if (last != -1L && !kind.isConflated)
Roman Elizarov43918972017-10-07 21:00:06 +0300121 check(i == last + 1) { "Last was $last, got $i" }
122 receivedTotal.incrementAndGet()
123 lastReceived[receiverIndex].set(i)
124 return i == stopOnReceive.get()
125 }
126
Roman Elizarov45c85652017-10-13 14:46:22 +0300127 private suspend fun doReceive(channel: ReceiveChannel<Long>, receiverIndex: Int) {
Roman Elizarov43918972017-10-07 21:00:06 +0300128 while (true) {
129 try {
130 val stop = doReceived(receiverIndex, channel.receive())
131 if (stop) break
132 }
133 catch (ex: ClosedReceiveChannelException) { break }
134 }
135 }
136
Roman Elizarov45c85652017-10-13 14:46:22 +0300137 private suspend fun doReceiveOrNull(channel: ReceiveChannel<Long>, receiverIndex: Int) {
Roman Elizarov43918972017-10-07 21:00:06 +0300138 while (true) {
139 val stop = doReceived(receiverIndex, channel.receiveOrNull() ?: break)
140 if (stop) break
141 }
142 }
143
Roman Elizarov45c85652017-10-13 14:46:22 +0300144 private suspend fun doIterator(channel: ReceiveChannel<Long>, receiverIndex: Int) {
Roman Elizarov43918972017-10-07 21:00:06 +0300145 for (event in channel) {
146 val stop = doReceived(receiverIndex, event)
147 if (stop) break
148 }
149 }
150
Roman Elizarov45c85652017-10-13 14:46:22 +0300151 private suspend fun doReceiveSelect(channel: ReceiveChannel<Long>, receiverIndex: Int) {
Roman Elizarov43918972017-10-07 21:00:06 +0300152 while (true) {
153 try {
Roman Elizarov45c85652017-10-13 14:46:22 +0300154 val event = select<Long> { channel.onReceive { it } }
Roman Elizarov43918972017-10-07 21:00:06 +0300155 val stop = doReceived(receiverIndex, event)
156 if (stop) break
157 } catch (ex: ClosedReceiveChannelException) { break }
158 }
159 }
160
Roman Elizarov45c85652017-10-13 14:46:22 +0300161 private suspend fun doReceiveSelectOrNull(channel: ReceiveChannel<Long>, receiverIndex: Int) {
Roman Elizarov43918972017-10-07 21:00:06 +0300162 while (true) {
Roman Elizarov45c85652017-10-13 14:46:22 +0300163 val event = select<Long?> { channel.onReceiveOrNull { it } } ?: break
Roman Elizarov43918972017-10-07 21:00:06 +0300164 val stop = doReceived(receiverIndex, event)
165 if (stop) break
166 }
167 }
168}