ArrayChannel implementation and tests
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt
new file mode 100644
index 0000000..2f837e5
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt
@@ -0,0 +1,133 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import org.junit.Test
+import kotlin.test.assertEquals
+import kotlin.test.assertFalse
+import kotlin.test.assertTrue
+
+class ArrayChannelTest : TestBase() {
+ @Test
+ fun testSimple() = runBlocking {
+ val q = ArrayChannel<Int>(1)
+ check(q.isEmpty && !q.isFull)
+ expect(1)
+ val sender = launch(context) {
+ expect(4)
+ q.send(1) // success -- buffered
+ check(!q.isEmpty && q.isFull)
+ expect(5)
+ q.send(2) // suspends (buffer full)
+ expect(9)
+ }
+ expect(2)
+ val receiver = launch(context) {
+ expect(6)
+ check(q.receive() == 1) // does not suspend -- took from buffer
+ check(!q.isEmpty && q.isFull) // waiting sender's element moved to buffer
+ expect(7)
+ check(q.receive() == 2) // does not suspend (takes from sender)
+ expect(8)
+ }
+ expect(3)
+ sender.join()
+ receiver.join()
+ check(q.isEmpty && !q.isFull)
+ finish(10)
+ }
+
+ @Test
+ fun testStress() = runBlocking {
+ val n = 100_000
+ val q = ArrayChannel<Int>(1)
+ val sender = launch(context) {
+ for (i in 1..n) q.send(i)
+ expect(2)
+ }
+ val receiver = launch(context) {
+ for (i in 1..n) check(q.receive() == i)
+ expect(3)
+ }
+ expect(1)
+ sender.join()
+ receiver.join()
+ finish(4)
+ }
+
+ @Test
+ fun testClosedBufferedReceiveOrNull() = runBlocking {
+ val q = ArrayChannel<Int>(1)
+ check(q.isEmpty && !q.isFull && !q.isClosedForSend && !q.isClosedForReceive)
+ expect(1)
+ launch(context) {
+ expect(5)
+ check(!q.isEmpty && !q.isFull && q.isClosedForSend && !q.isClosedForReceive)
+ assertEquals(42, q.receiveOrNull())
+ expect(6)
+ check(!q.isEmpty && !q.isFull && q.isClosedForSend && q.isClosedForReceive)
+ assertEquals(null, q.receiveOrNull())
+ expect(7)
+ }
+ expect(2)
+ q.send(42) // buffers
+ expect(3)
+ q.close() // goes on
+ expect(4)
+ check(!q.isEmpty && !q.isFull && q.isClosedForSend && !q.isClosedForReceive)
+ yield()
+ check(!q.isEmpty && !q.isFull && q.isClosedForSend && q.isClosedForReceive)
+ finish(8)
+ }
+
+ @Test
+ fun testClosedExceptions() = runBlocking {
+ val q = ArrayChannel<Int>(1)
+ expect(1)
+ launch(context) {
+ expect(4)
+ try { q.receive() }
+ catch (e: ClosedReceiveChannelException) {
+ expect(5)
+ }
+ }
+ expect(2)
+ q.close()
+ expect(3)
+ yield()
+ expect(6)
+ try { q.send(42) }
+ catch (e: ClosedSendChannelException) {
+ finish(7)
+ }
+ }
+
+ @Test
+ fun testOfferAndPool() = runBlocking {
+ val q = ArrayChannel<Int>(1)
+ assertTrue(q.offer(1))
+ expect(1)
+ launch(context) {
+ expect(3)
+ assertEquals(1, q.poll())
+ expect(4)
+ assertEquals(null, q.poll())
+ expect(5)
+ assertEquals(2, q.receive()) // suspends
+ expect(9)
+ assertEquals(3, q.poll())
+ expect(10)
+ assertEquals(null, q.poll())
+ expect(11)
+ }
+ expect(2)
+ yield()
+ expect(6)
+ assertTrue(q.offer(2))
+ expect(7)
+ assertTrue(q.offer(3))
+ expect(8)
+ assertFalse(q.offer(4))
+ yield()
+ finish(12)
+ }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelAtomicityStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt
similarity index 86%
rename from kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelAtomicityStressTest.kt
rename to kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt
index d444a5d..a395ce4 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelAtomicityStressTest.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt
@@ -2,13 +2,22 @@
import kotlinx.coroutines.experimental.*
import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
import java.util.*
import kotlin.test.assertEquals
-class RendezvousChannelAtomicityStressTest {
+@RunWith(Parameterized::class)
+class ChannelAtomicCancelStressTest(val kind: TestChannelKind) {
+ companion object {
+ @Parameterized.Parameters(name = "{0}")
+ @JvmStatic
+ fun params(): Collection<Array<Any>> = TestChannelKind.values().map { arrayOf<Any>(it) }
+ }
+
val TEST_DURATION = 3000L
- val channel = RendezvousChannel<Int>()
+ val channel = kind.create()
val senderDone = RendezvousChannel<Boolean>()
val receiverDone = RendezvousChannel<Boolean>()
@@ -25,7 +34,7 @@
lateinit var receiver: Job
@Test
- fun testStress() = runBlocking {
+ fun testAtomicCancelStress() = runBlocking {
val deadline = System.currentTimeMillis() + TEST_DURATION
launchSender()
launchReceiver()
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt
new file mode 100644
index 0000000..32a33ca
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt
@@ -0,0 +1,106 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicInteger
+import kotlin.test.assertEquals
+import kotlin.test.assertTrue
+
+
+@RunWith(Parameterized::class)
+class ChannelSendReceiveStressTest(
+ val kind: TestChannelKind,
+ val nSenders: Int,
+ val nReceivers: Int
+) {
+ companion object {
+ @Parameterized.Parameters(name = "{0}, nSenders={1}, nReceivers={2}")
+ @JvmStatic
+ fun params(): Collection<Array<Any>> =
+ listOf(1, 2, 10).flatMap { nSenders ->
+ listOf(1, 6).flatMap { nReceivers ->
+ TestChannelKind.values().map { arrayOf<Any>(it, nSenders, nReceivers) }
+ }
+ }
+ }
+
+ val nEvents = 1_000_000
+
+ val channel = kind.create()
+ val sendersCompleted = AtomicInteger()
+ val receiversCompleted = AtomicInteger()
+ val dupes = AtomicInteger()
+ val received = ConcurrentHashMap<Int,Int>()
+ val receivedBy = IntArray(nReceivers)
+
+ @Test
+ fun testSendReceiveStress() = runBlocking {
+ val receivers = List(nReceivers) { receiverIndex ->
+ // different event receivers use different code
+ launch(CommonPool + CoroutineName("receiver$receiverIndex")) {
+ when (receiverIndex % 3) {
+ 0 -> doReceive(receiverIndex)
+ 1 -> doReceiveOrNull(receiverIndex)
+ 2 -> doIterator(receiverIndex)
+ }
+ receiversCompleted.incrementAndGet()
+ }
+ }
+ val senders = List(nSenders) { senderIndex ->
+ launch(CommonPool + CoroutineName("sender$senderIndex")) {
+ for (i in senderIndex until nEvents step nSenders)
+ channel.send(i)
+ sendersCompleted.incrementAndGet()
+ }
+ }
+ senders.forEach { it.join() }
+ channel.close()
+ receivers.forEach { it.join() }
+ println("Tested $kind with nSenders=$nSenders, nReceivers=$nReceivers")
+ println("Completed successfully ${sendersCompleted.get()} sender coroutines")
+ println("Completed successfully ${receiversCompleted.get()} receiver coroutines")
+ println(" Received ${received.size} events")
+ println(" Received dupes ${dupes.get()}")
+ repeat(nReceivers) { receiveIndex ->
+ println(" Received by #$receiveIndex ${receivedBy[receiveIndex]}")
+ }
+ assertEquals(nSenders, sendersCompleted.get())
+ assertEquals(nReceivers, receiversCompleted.get())
+ assertEquals(0, dupes.get())
+ assertEquals(nEvents, received.size)
+ repeat(nReceivers) { receiveIndex ->
+ assertTrue(receivedBy[receiveIndex] > 0, "Each receiver should have received something")
+ }
+ }
+
+ private fun doReceived(receiverIndex: Int, event: Int) {
+ if (received.put(event, event) != null) {
+ println("Duplicate event $event")
+ dupes.incrementAndGet()
+ }
+ receivedBy[receiverIndex]++
+ }
+
+ private suspend fun doReceive(receiverIndex: Int) {
+ while (true) {
+ try { doReceived(receiverIndex, channel.receive()) }
+ catch (ex: ClosedReceiveChannelException) { break }
+ }
+
+ }
+
+ private suspend fun doReceiveOrNull(receiverIndex: Int) {
+ while (true) {
+ doReceived(receiverIndex, channel.receiveOrNull() ?: break)
+ }
+ }
+
+ private suspend fun doIterator(receiverIndex: Int) {
+ for (event in channel) {
+ doReceived(receiverIndex, event)
+ }
+ }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelMultipleReceiversStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelMultipleReceiversStressTest.kt
deleted file mode 100644
index a8269c3..0000000
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelMultipleReceiversStressTest.kt
+++ /dev/null
@@ -1,84 +0,0 @@
-package kotlinx.coroutines.experimental.channels
-
-import kotlinx.coroutines.experimental.CommonPool
-import kotlinx.coroutines.experimental.join
-import kotlinx.coroutines.experimental.launch
-import kotlinx.coroutines.experimental.runBlocking
-import org.junit.Test
-import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.atomic.AtomicInteger
-import kotlin.test.assertEquals
-import kotlin.test.assertTrue
-
-class RendezvousChannelMultipleReceiversStressTest {
- val nEvents = 1_000_000
- val nReceivers = 6
-
- val channel = RendezvousChannel<Int>()
- val completedSuccessfully = AtomicInteger()
- val dupes = AtomicInteger()
- val received = ConcurrentHashMap<Int,Int>()
- val receivedBy = IntArray(nReceivers)
-
- @Test
- fun testStress() = runBlocking {
- val receivers = List(nReceivers) { receiverIndex ->
- // different event receivers use different code
- launch(CommonPool) {
- when (receiverIndex % 3) {
- 0 -> doReceive(receiverIndex)
- 1 -> doReceiveOrNull(receiverIndex)
- 2 -> doIterator(receiverIndex)
- }
- completedSuccessfully.incrementAndGet()
- }
- }
- repeat(nEvents) {
- channel.send(it)
- }
- channel.close()
- receivers.forEach { it.join() }
- println("Completed successfully ${completedSuccessfully.get()} coroutines")
- println(" Received ${received.size} events")
- println(" Received dupes ${dupes.get()}")
- repeat(nReceivers) { receiveIndex ->
- println(" Received by #$receiveIndex ${receivedBy[receiveIndex]}")
- }
- assertEquals(nReceivers, completedSuccessfully.get())
- assertEquals(0, dupes.get())
- assertEquals(nEvents, received.size)
- repeat(nReceivers) { receiveIndex ->
- assertTrue(receivedBy[receiveIndex] > nEvents / nReceivers / 2, "Should be balanced")
- }
- }
-
- private fun doReceived(event: Int) {
- if (received.put(event, event) != null) {
- println("Duplicate event $event")
- dupes.incrementAndGet()
- }
- }
-
- private suspend fun doReceive(receiverIndex: Int) {
- while (true) {
- try { doReceived(channel.receive()) }
- catch (ex: ClosedReceiveChannelException) { break }
- receivedBy[receiverIndex]++
- }
-
- }
-
- private suspend fun doReceiveOrNull(receiverIndex: Int) {
- while (true) {
- doReceived(channel.receiveOrNull() ?: break)
- receivedBy[receiverIndex]++
- }
- }
-
- private suspend fun doIterator(receiverIndex: Int) {
- for (event in channel) {
- doReceived(event)
- receivedBy[receiverIndex]++
- }
- }
-}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
index 669607e..741f5a9 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
@@ -103,14 +103,14 @@
expect(1)
launch(context) {
expect(3)
- assertEquals(null, q.pool())
+ assertEquals(null, q.poll())
expect(4)
assertEquals(2, q.receive())
expect(7)
- assertEquals(null, q.pool())
+ assertEquals(null, q.poll())
yield()
expect(9)
- assertEquals(3, q.pool())
+ assertEquals(3, q.poll())
expect(10)
}
expect(2)
@@ -123,4 +123,110 @@
q.send(3)
finish(11)
}
+
+ @Test
+ fun testIteratorClosed() = runBlocking {
+ val q = RendezvousChannel<Int>()
+ expect(1)
+ launch(context) {
+ expect(3)
+ q.close()
+ expect(4)
+ }
+ expect(2)
+ for (x in q) {
+ expectUnreached()
+ }
+ finish(5)
+ }
+
+ @Test
+ fun testIteratorOne() = runBlocking {
+ val q = RendezvousChannel<Int>()
+ expect(1)
+ launch(context) {
+ expect(3)
+ q.send(1)
+ expect(4)
+ q.close()
+ expect(5)
+ }
+ expect(2)
+ for (x in q) {
+ expect(6)
+ assertEquals(1, x)
+ }
+ finish(7)
+ }
+
+ @Test
+ fun testIteratorOneWithYield() = runBlocking {
+ val q = RendezvousChannel<Int>()
+ expect(1)
+ launch(context) {
+ expect(3)
+ q.send(1) // will suspend
+ expect(6)
+ q.close()
+ expect(7)
+ }
+ expect(2)
+ yield() // yield to sender coroutine right before starting for loop
+ expect(4)
+ for (x in q) {
+ expect(5)
+ assertEquals(1, x)
+ }
+ finish(8)
+ }
+
+ @Test
+ fun testIteratorTwo() = runBlocking {
+ val q = RendezvousChannel<Int>()
+ expect(1)
+ launch(context) {
+ expect(3)
+ q.send(1)
+ expect(4)
+ q.send(2)
+ expect(7)
+ q.close()
+ expect(8)
+ }
+ expect(2)
+ for (x in q) {
+ when (x) {
+ 1 -> expect(5)
+ 2 -> expect(6)
+ else -> expectUnreached()
+ }
+ }
+ finish(9)
+ }
+
+ @Test
+ fun testIteratorTwoWithYield() = runBlocking {
+ val q = RendezvousChannel<Int>()
+ expect(1)
+ launch(context) {
+ expect(3)
+ q.send(1) // will suspend
+ expect(6)
+ q.send(2)
+ expect(7)
+ q.close()
+ expect(8)
+ }
+ expect(2)
+ yield() // yield to sender coroutine right before starting for loop
+ expect(4)
+ for (x in q) {
+ when (x) {
+ 1 -> expect(5)
+ 2 -> expect(9)
+ else -> expectUnreached()
+ }
+ }
+ finish(10)
+ }
}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt
new file mode 100644
index 0000000..1464151
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt
@@ -0,0 +1,40 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.CommonPool
+import kotlinx.coroutines.experimental.launch
+import kotlinx.coroutines.experimental.runBlocking
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import kotlin.test.assertEquals
+
+@RunWith(Parameterized::class)
+class SimpleSendReceiveTest(
+ val kind: TestChannelKind,
+ val n: Int
+) {
+ companion object {
+ @Parameterized.Parameters(name = "{0}, n={1}")
+ @JvmStatic
+ fun params(): Collection<Array<Any>> = TestChannelKind.values().flatMap { kind ->
+ listOf(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 100, 1000).map { n ->
+ arrayOf<Any>(kind, n)
+ }
+ }
+ }
+
+ val channel = kind.create()
+
+ @Test
+ fun testSimpleSendReceive() = runBlocking {
+ launch(CommonPool) {
+ repeat(n) { channel.send(it) }
+ channel.close()
+ }
+ var received = 0
+ for (x in channel) {
+ assertEquals(received++, x)
+ }
+ assertEquals(n, received)
+ }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
new file mode 100644
index 0000000..e5533f4
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
@@ -0,0 +1,19 @@
+package kotlinx.coroutines.experimental.channels
+
+enum class TestChannelKind {
+ RENDEZVOUS {
+ override fun create(): Channel<Int> = RendezvousChannel<Int>()
+ override fun toString(): String = "RendezvousChannel"
+ },
+ ARRAY_1 {
+ override fun create(): Channel<Int> = ArrayChannel<Int>(1)
+ override fun toString(): String = "ArrayChannel(1)"
+ },
+ ARRAY_10 {
+ override fun create(): Channel<Int> = ArrayChannel<Int>(8)
+ override fun toString(): String = "ArrayChannel(8)"
+ }
+ ;
+
+ abstract fun create(): Channel<Int>
+}