blob: 57a476f41d48f2f4bf227d61d7b8580c463ca1f3 [file] [log] [blame]
Roman Elizarovf16fd272017-02-07 11:26:00 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarovf16fd272017-02-07 11:26:00 +03003 */
4
Roman Elizarov7b2d8b02017-02-02 20:09:14 +03005package kotlinx.coroutines.experimental.channels
6
7import kotlinx.coroutines.experimental.*
Roman Elizarov9fe5f462018-02-21 19:05:52 +03008import kotlinx.coroutines.experimental.selects.*
9import org.junit.*
10import org.junit.Assert.*
11import org.junit.runner.*
12import org.junit.runners.*
13import java.util.concurrent.atomic.*
14import kotlin.coroutines.experimental.*
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030015
16@RunWith(Parameterized::class)
17class ChannelSendReceiveStressTest(
18 val kind: TestChannelKind,
19 val nSenders: Int,
20 val nReceivers: Int
Roman Elizarovebe18b42017-02-28 17:50:55 +030021) : TestBase() {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030022 companion object {
23 @Parameterized.Parameters(name = "{0}, nSenders={1}, nReceivers={2}")
24 @JvmStatic
25 fun params(): Collection<Array<Any>> =
26 listOf(1, 2, 10).flatMap { nSenders ->
Roman Elizarov1216e912017-02-22 09:57:06 +030027 listOf(1, 10).flatMap { nReceivers ->
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030028 TestChannelKind.values().map { arrayOf<Any>(it, nSenders, nReceivers) }
29 }
30 }
31 }
32
Roman Elizarovebe18b42017-02-28 17:50:55 +030033 val timeLimit = 30_000L * stressTestMultiplier // 30 sec
Roman Elizarov7f1380a2017-10-21 17:27:27 +030034 val nEvents = 200_000 * stressTestMultiplier
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030035
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030036 val maxBuffer = 10_000 // artificial limit for LinkedListChannel
Roman Elizarovbc296bb2017-03-01 00:37:37 +030037
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030038 val channel = kind.create()
39 val sendersCompleted = AtomicInteger()
40 val receiversCompleted = AtomicInteger()
41 val dupes = AtomicInteger()
Roman Elizarovbc296bb2017-03-01 00:37:37 +030042 val sentTotal = AtomicInteger()
Roman Elizarovcbd8e402017-02-28 23:55:41 +030043 val received = AtomicIntegerArray(nEvents)
44 val receivedTotal = AtomicInteger()
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030045 val receivedBy = IntArray(nReceivers)
46
47 @Test
48 fun testSendReceiveStress() = runBlocking {
Roman Elizarovd3d335b2017-10-21 17:43:53 +030049 println("--- ChannelSendReceiveStressTest $kind with nSenders=$nSenders, nReceivers=$nReceivers")
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030050 val receivers = List(nReceivers) { receiverIndex ->
51 // different event receivers use different code
52 launch(CommonPool + CoroutineName("receiver$receiverIndex")) {
Roman Elizarov1216e912017-02-22 09:57:06 +030053 when (receiverIndex % 5) {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030054 0 -> doReceive(receiverIndex)
55 1 -> doReceiveOrNull(receiverIndex)
56 2 -> doIterator(receiverIndex)
Roman Elizarov1216e912017-02-22 09:57:06 +030057 3 -> doReceiveSelect(receiverIndex)
58 4 -> doReceiveSelectOrNull(receiverIndex)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030059 }
60 receiversCompleted.incrementAndGet()
61 }
62 }
63 val senders = List(nSenders) { senderIndex ->
64 launch(CommonPool + CoroutineName("sender$senderIndex")) {
Roman Elizarov1216e912017-02-22 09:57:06 +030065 when (senderIndex % 2) {
66 0 -> doSend(senderIndex)
67 1 -> doSendSelect(senderIndex)
68 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030069 sendersCompleted.incrementAndGet()
70 }
71 }
Roman Elizarov3aed4ee2017-03-06 12:21:05 +030072 // print progress
Roman Elizarov43e3af72017-07-21 16:01:31 +030073 val progressJob = launch(coroutineContext) {
Roman Elizarov3aed4ee2017-03-06 12:21:05 +030074 var seconds = 0
75 while (true) {
76 delay(1000)
77 println("${++seconds}: Sent ${sentTotal.get()}, received ${receivedTotal.get()}")
78 }
79 }
Roman Elizarov1216e912017-02-22 09:57:06 +030080 try {
81 withTimeout(timeLimit) {
82 senders.forEach { it.join() }
83 channel.close()
84 receivers.forEach { it.join() }
85 }
86 } catch (e: CancellationException) {
87 println("!!! Test timed out $e")
88 }
Roman Elizarov3aed4ee2017-03-06 12:21:05 +030089 progressJob.cancel()
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030090 println("Tested $kind with nSenders=$nSenders, nReceivers=$nReceivers")
91 println("Completed successfully ${sendersCompleted.get()} sender coroutines")
92 println("Completed successfully ${receiversCompleted.get()} receiver coroutines")
Roman Elizarovbc296bb2017-03-01 00:37:37 +030093 println(" Sent ${sentTotal.get()} events")
Roman Elizarovcbd8e402017-02-28 23:55:41 +030094 println(" Received ${receivedTotal.get()} events")
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030095 println(" Received dupes ${dupes.get()}")
96 repeat(nReceivers) { receiveIndex ->
97 println(" Received by #$receiveIndex ${receivedBy[receiveIndex]}")
98 }
99 assertEquals(nSenders, sendersCompleted.get())
100 assertEquals(nReceivers, receiversCompleted.get())
101 assertEquals(0, dupes.get())
Roman Elizarovbc296bb2017-03-01 00:37:37 +0300102 assertEquals(nEvents, sentTotal.get())
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300103 if (!kind.isConflated) assertEquals(nEvents, receivedTotal.get())
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300104 repeat(nReceivers) { receiveIndex ->
Roman Elizarov12f961d2017-02-06 15:44:03 +0300105 assertTrue("Each receiver should have received something", receivedBy[receiveIndex] > 0)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300106 }
107 }
108
Roman Elizarovbc296bb2017-03-01 00:37:37 +0300109 private suspend fun doSent() {
110 sentTotal.incrementAndGet()
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300111 if (!kind.isConflated) {
Roman Elizarov3aed4ee2017-03-06 12:21:05 +0300112 while (sentTotal.get() > receivedTotal.get() + maxBuffer)
113 yield() // throttle fast senders to prevent OOM with LinkedListChannel
114 }
Roman Elizarovbc296bb2017-03-01 00:37:37 +0300115 }
116
Roman Elizarov1216e912017-02-22 09:57:06 +0300117 private suspend fun doSend(senderIndex: Int) {
Roman Elizarovbc296bb2017-03-01 00:37:37 +0300118 for (i in senderIndex until nEvents step nSenders) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300119 channel.send(i)
Roman Elizarovbc296bb2017-03-01 00:37:37 +0300120 doSent()
121 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300122 }
123
124 private suspend fun doSendSelect(senderIndex: Int) {
Roman Elizarovbc296bb2017-03-01 00:37:37 +0300125 for (i in senderIndex until nEvents step nSenders) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300126 select<Unit> { channel.onSend(i) { Unit } }
Roman Elizarovbc296bb2017-03-01 00:37:37 +0300127 doSent()
128 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300129 }
130
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300131 private fun doReceived(receiverIndex: Int, event: Int) {
Roman Elizarovcbd8e402017-02-28 23:55:41 +0300132 if (!received.compareAndSet(event, 0, 1)) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300133 println("Duplicate event $event at $receiverIndex")
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300134 dupes.incrementAndGet()
135 }
Roman Elizarovcbd8e402017-02-28 23:55:41 +0300136 receivedTotal.incrementAndGet()
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300137 receivedBy[receiverIndex]++
138 }
139
140 private suspend fun doReceive(receiverIndex: Int) {
141 while (true) {
142 try { doReceived(receiverIndex, channel.receive()) }
143 catch (ex: ClosedReceiveChannelException) { break }
144 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300145 }
146
147 private suspend fun doReceiveOrNull(receiverIndex: Int) {
148 while (true) {
149 doReceived(receiverIndex, channel.receiveOrNull() ?: break)
150 }
151 }
152
153 private suspend fun doIterator(receiverIndex: Int) {
154 for (event in channel) {
155 doReceived(receiverIndex, event)
156 }
157 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300158
159 private suspend fun doReceiveSelect(receiverIndex: Int) {
160 while (true) {
161 try {
162 val event = select<Int> { channel.onReceive { it } }
163 doReceived(receiverIndex, event)
164 } catch (ex: ClosedReceiveChannelException) { break }
165 }
166 }
167
168 private suspend fun doReceiveSelectOrNull(receiverIndex: Int) {
169 while (true) {
170 val event = select<Int?> { channel.onReceiveOrNull { it } } ?: break
171 doReceived(receiverIndex, event)
172 }
173 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300174}