blob: 32a33ca7e7928d7603b044103ab7dc121e8eb85f [file] [log] [blame]
Roman Elizarov7b2d8b02017-02-02 20:09:14 +03001package kotlinx.coroutines.experimental.channels
2
3import kotlinx.coroutines.experimental.*
4import org.junit.Test
5import org.junit.runner.RunWith
6import org.junit.runners.Parameterized
7import java.util.concurrent.ConcurrentHashMap
8import java.util.concurrent.atomic.AtomicInteger
9import kotlin.test.assertEquals
10import kotlin.test.assertTrue
11
12
13@RunWith(Parameterized::class)
14class ChannelSendReceiveStressTest(
15 val kind: TestChannelKind,
16 val nSenders: Int,
17 val nReceivers: Int
18) {
19 companion object {
20 @Parameterized.Parameters(name = "{0}, nSenders={1}, nReceivers={2}")
21 @JvmStatic
22 fun params(): Collection<Array<Any>> =
23 listOf(1, 2, 10).flatMap { nSenders ->
24 listOf(1, 6).flatMap { nReceivers ->
25 TestChannelKind.values().map { arrayOf<Any>(it, nSenders, nReceivers) }
26 }
27 }
28 }
29
30 val nEvents = 1_000_000
31
32 val channel = kind.create()
33 val sendersCompleted = AtomicInteger()
34 val receiversCompleted = AtomicInteger()
35 val dupes = AtomicInteger()
36 val received = ConcurrentHashMap<Int,Int>()
37 val receivedBy = IntArray(nReceivers)
38
39 @Test
40 fun testSendReceiveStress() = runBlocking {
41 val receivers = List(nReceivers) { receiverIndex ->
42 // different event receivers use different code
43 launch(CommonPool + CoroutineName("receiver$receiverIndex")) {
44 when (receiverIndex % 3) {
45 0 -> doReceive(receiverIndex)
46 1 -> doReceiveOrNull(receiverIndex)
47 2 -> doIterator(receiverIndex)
48 }
49 receiversCompleted.incrementAndGet()
50 }
51 }
52 val senders = List(nSenders) { senderIndex ->
53 launch(CommonPool + CoroutineName("sender$senderIndex")) {
54 for (i in senderIndex until nEvents step nSenders)
55 channel.send(i)
56 sendersCompleted.incrementAndGet()
57 }
58 }
59 senders.forEach { it.join() }
60 channel.close()
61 receivers.forEach { it.join() }
62 println("Tested $kind with nSenders=$nSenders, nReceivers=$nReceivers")
63 println("Completed successfully ${sendersCompleted.get()} sender coroutines")
64 println("Completed successfully ${receiversCompleted.get()} receiver coroutines")
65 println(" Received ${received.size} events")
66 println(" Received dupes ${dupes.get()}")
67 repeat(nReceivers) { receiveIndex ->
68 println(" Received by #$receiveIndex ${receivedBy[receiveIndex]}")
69 }
70 assertEquals(nSenders, sendersCompleted.get())
71 assertEquals(nReceivers, receiversCompleted.get())
72 assertEquals(0, dupes.get())
73 assertEquals(nEvents, received.size)
74 repeat(nReceivers) { receiveIndex ->
75 assertTrue(receivedBy[receiveIndex] > 0, "Each receiver should have received something")
76 }
77 }
78
79 private fun doReceived(receiverIndex: Int, event: Int) {
80 if (received.put(event, event) != null) {
81 println("Duplicate event $event")
82 dupes.incrementAndGet()
83 }
84 receivedBy[receiverIndex]++
85 }
86
87 private suspend fun doReceive(receiverIndex: Int) {
88 while (true) {
89 try { doReceived(receiverIndex, channel.receive()) }
90 catch (ex: ClosedReceiveChannelException) { break }
91 }
92
93 }
94
95 private suspend fun doReceiveOrNull(receiverIndex: Int) {
96 while (true) {
97 doReceived(receiverIndex, channel.receiveOrNull() ?: break)
98 }
99 }
100
101 private suspend fun doIterator(receiverIndex: Int) {
102 for (event in channel) {
103 doReceived(receiverIndex, event)
104 }
105 }
106}