ArrayChannel implementation and tests
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt
new file mode 100644
index 0000000..32a33ca
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt
@@ -0,0 +1,106 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicInteger
+import kotlin.test.assertEquals
+import kotlin.test.assertTrue
+
+
+@RunWith(Parameterized::class)
+class ChannelSendReceiveStressTest(
+ val kind: TestChannelKind,
+ val nSenders: Int,
+ val nReceivers: Int
+) {
+ companion object {
+ @Parameterized.Parameters(name = "{0}, nSenders={1}, nReceivers={2}")
+ @JvmStatic
+ fun params(): Collection<Array<Any>> =
+ listOf(1, 2, 10).flatMap { nSenders ->
+ listOf(1, 6).flatMap { nReceivers ->
+ TestChannelKind.values().map { arrayOf<Any>(it, nSenders, nReceivers) }
+ }
+ }
+ }
+
+ val nEvents = 1_000_000
+
+ val channel = kind.create()
+ val sendersCompleted = AtomicInteger()
+ val receiversCompleted = AtomicInteger()
+ val dupes = AtomicInteger()
+ val received = ConcurrentHashMap<Int,Int>()
+ val receivedBy = IntArray(nReceivers)
+
+ @Test
+ fun testSendReceiveStress() = runBlocking {
+ val receivers = List(nReceivers) { receiverIndex ->
+ // different event receivers use different code
+ launch(CommonPool + CoroutineName("receiver$receiverIndex")) {
+ when (receiverIndex % 3) {
+ 0 -> doReceive(receiverIndex)
+ 1 -> doReceiveOrNull(receiverIndex)
+ 2 -> doIterator(receiverIndex)
+ }
+ receiversCompleted.incrementAndGet()
+ }
+ }
+ val senders = List(nSenders) { senderIndex ->
+ launch(CommonPool + CoroutineName("sender$senderIndex")) {
+ for (i in senderIndex until nEvents step nSenders)
+ channel.send(i)
+ sendersCompleted.incrementAndGet()
+ }
+ }
+ senders.forEach { it.join() }
+ channel.close()
+ receivers.forEach { it.join() }
+ println("Tested $kind with nSenders=$nSenders, nReceivers=$nReceivers")
+ println("Completed successfully ${sendersCompleted.get()} sender coroutines")
+ println("Completed successfully ${receiversCompleted.get()} receiver coroutines")
+ println(" Received ${received.size} events")
+ println(" Received dupes ${dupes.get()}")
+ repeat(nReceivers) { receiveIndex ->
+ println(" Received by #$receiveIndex ${receivedBy[receiveIndex]}")
+ }
+ assertEquals(nSenders, sendersCompleted.get())
+ assertEquals(nReceivers, receiversCompleted.get())
+ assertEquals(0, dupes.get())
+ assertEquals(nEvents, received.size)
+ repeat(nReceivers) { receiveIndex ->
+ assertTrue(receivedBy[receiveIndex] > 0, "Each receiver should have received something")
+ }
+ }
+
+ private fun doReceived(receiverIndex: Int, event: Int) {
+ if (received.put(event, event) != null) {
+ println("Duplicate event $event")
+ dupes.incrementAndGet()
+ }
+ receivedBy[receiverIndex]++
+ }
+
+ private suspend fun doReceive(receiverIndex: Int) {
+ while (true) {
+ try { doReceived(receiverIndex, channel.receive()) }
+ catch (ex: ClosedReceiveChannelException) { break }
+ }
+
+ }
+
+ private suspend fun doReceiveOrNull(receiverIndex: Int) {
+ while (true) {
+ doReceived(receiverIndex, channel.receiveOrNull() ?: break)
+ }
+ }
+
+ private suspend fun doIterator(receiverIndex: Int) {
+ for (event in channel) {
+ doReceived(receiverIndex, event)
+ }
+ }
+}
\ No newline at end of file