blob: 2f58efe13d9738b313932ae070674a4363633e3a [file] [log] [blame]
Roman Elizarov7b2d8b02017-02-02 20:09:14 +03001package kotlinx.coroutines.experimental.channels
2
3import kotlinx.coroutines.experimental.*
4import org.junit.Test
5import org.junit.runner.RunWith
6import org.junit.runners.Parameterized
7import java.util.concurrent.ConcurrentHashMap
8import java.util.concurrent.atomic.AtomicInteger
Roman Elizarov12f961d2017-02-06 15:44:03 +03009import org.junit.Assert.*
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030010
11@RunWith(Parameterized::class)
12class ChannelSendReceiveStressTest(
13 val kind: TestChannelKind,
14 val nSenders: Int,
15 val nReceivers: Int
16) {
17 companion object {
18 @Parameterized.Parameters(name = "{0}, nSenders={1}, nReceivers={2}")
19 @JvmStatic
20 fun params(): Collection<Array<Any>> =
21 listOf(1, 2, 10).flatMap { nSenders ->
22 listOf(1, 6).flatMap { nReceivers ->
23 TestChannelKind.values().map { arrayOf<Any>(it, nSenders, nReceivers) }
24 }
25 }
26 }
27
28 val nEvents = 1_000_000
29
30 val channel = kind.create()
31 val sendersCompleted = AtomicInteger()
32 val receiversCompleted = AtomicInteger()
33 val dupes = AtomicInteger()
34 val received = ConcurrentHashMap<Int,Int>()
35 val receivedBy = IntArray(nReceivers)
36
37 @Test
38 fun testSendReceiveStress() = runBlocking {
39 val receivers = List(nReceivers) { receiverIndex ->
40 // different event receivers use different code
41 launch(CommonPool + CoroutineName("receiver$receiverIndex")) {
42 when (receiverIndex % 3) {
43 0 -> doReceive(receiverIndex)
44 1 -> doReceiveOrNull(receiverIndex)
45 2 -> doIterator(receiverIndex)
46 }
47 receiversCompleted.incrementAndGet()
48 }
49 }
50 val senders = List(nSenders) { senderIndex ->
51 launch(CommonPool + CoroutineName("sender$senderIndex")) {
52 for (i in senderIndex until nEvents step nSenders)
53 channel.send(i)
54 sendersCompleted.incrementAndGet()
55 }
56 }
57 senders.forEach { it.join() }
58 channel.close()
59 receivers.forEach { it.join() }
60 println("Tested $kind with nSenders=$nSenders, nReceivers=$nReceivers")
61 println("Completed successfully ${sendersCompleted.get()} sender coroutines")
62 println("Completed successfully ${receiversCompleted.get()} receiver coroutines")
63 println(" Received ${received.size} events")
64 println(" Received dupes ${dupes.get()}")
65 repeat(nReceivers) { receiveIndex ->
66 println(" Received by #$receiveIndex ${receivedBy[receiveIndex]}")
67 }
68 assertEquals(nSenders, sendersCompleted.get())
69 assertEquals(nReceivers, receiversCompleted.get())
70 assertEquals(0, dupes.get())
71 assertEquals(nEvents, received.size)
72 repeat(nReceivers) { receiveIndex ->
Roman Elizarov12f961d2017-02-06 15:44:03 +030073 assertTrue("Each receiver should have received something", receivedBy[receiveIndex] > 0)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030074 }
75 }
76
77 private fun doReceived(receiverIndex: Int, event: Int) {
78 if (received.put(event, event) != null) {
79 println("Duplicate event $event")
80 dupes.incrementAndGet()
81 }
82 receivedBy[receiverIndex]++
83 }
84
85 private suspend fun doReceive(receiverIndex: Int) {
86 while (true) {
87 try { doReceived(receiverIndex, channel.receive()) }
88 catch (ex: ClosedReceiveChannelException) { break }
89 }
90
91 }
92
93 private suspend fun doReceiveOrNull(receiverIndex: Int) {
94 while (true) {
95 doReceived(receiverIndex, channel.receiveOrNull() ?: break)
96 }
97 }
98
99 private suspend fun doIterator(receiverIndex: Int) {
100 for (event in channel) {
101 doReceived(receiverIndex, event)
102 }
103 }
104}