ArrayChannel implementation and tests
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt
new file mode 100644
index 0000000..2f837e5
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt
@@ -0,0 +1,133 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import org.junit.Test
+import kotlin.test.assertEquals
+import kotlin.test.assertFalse
+import kotlin.test.assertTrue
+
+class ArrayChannelTest : TestBase() {
+    @Test
+    fun testSimple() = runBlocking {
+        val q = ArrayChannel<Int>(1)
+        check(q.isEmpty && !q.isFull)
+        expect(1)
+        val sender = launch(context) {
+            expect(4)
+            q.send(1) // success -- buffered
+            check(!q.isEmpty && q.isFull)
+            expect(5)
+            q.send(2) // suspends (buffer full)
+            expect(9)
+        }
+        expect(2)
+        val receiver = launch(context) {
+            expect(6)
+            check(q.receive() == 1) // does not suspend -- took from buffer
+            check(!q.isEmpty && q.isFull) // waiting sender's element moved to buffer
+            expect(7)
+            check(q.receive() == 2) // does not suspend (takes from sender)
+            expect(8)
+        }
+        expect(3)
+        sender.join()
+        receiver.join()
+        check(q.isEmpty && !q.isFull)
+        finish(10)
+    }
+
+    @Test
+    fun testStress() = runBlocking {
+        val n = 100_000
+        val q = ArrayChannel<Int>(1)
+        val sender = launch(context) {
+            for (i in 1..n) q.send(i)
+            expect(2)
+        }
+        val receiver = launch(context) {
+            for (i in 1..n) check(q.receive() == i)
+            expect(3)
+        }
+        expect(1)
+        sender.join()
+        receiver.join()
+        finish(4)
+    }
+
+    @Test
+    fun testClosedBufferedReceiveOrNull() = runBlocking {
+        val q = ArrayChannel<Int>(1)
+        check(q.isEmpty && !q.isFull && !q.isClosedForSend && !q.isClosedForReceive)
+        expect(1)
+        launch(context) {
+            expect(5)
+            check(!q.isEmpty && !q.isFull && q.isClosedForSend && !q.isClosedForReceive)
+            assertEquals(42, q.receiveOrNull())
+            expect(6)
+            check(!q.isEmpty && !q.isFull && q.isClosedForSend && q.isClosedForReceive)
+            assertEquals(null, q.receiveOrNull())
+            expect(7)
+        }
+        expect(2)
+        q.send(42) // buffers
+        expect(3)
+        q.close() // goes on
+        expect(4)
+        check(!q.isEmpty && !q.isFull && q.isClosedForSend && !q.isClosedForReceive)
+        yield()
+        check(!q.isEmpty && !q.isFull && q.isClosedForSend && q.isClosedForReceive)
+        finish(8)
+    }
+
+    @Test
+    fun testClosedExceptions() = runBlocking {
+        val q = ArrayChannel<Int>(1)
+        expect(1)
+        launch(context) {
+            expect(4)
+            try { q.receive() }
+            catch (e: ClosedReceiveChannelException) {
+                expect(5)
+            }
+        }
+        expect(2)
+        q.close()
+        expect(3)
+        yield()
+        expect(6)
+        try { q.send(42) }
+        catch (e: ClosedSendChannelException) {
+            finish(7)
+        }
+    }
+
+    @Test
+    fun testOfferAndPool() = runBlocking {
+        val q = ArrayChannel<Int>(1)
+        assertTrue(q.offer(1))
+        expect(1)
+        launch(context) {
+            expect(3)
+            assertEquals(1, q.poll())
+            expect(4)
+            assertEquals(null, q.poll())
+            expect(5)
+            assertEquals(2, q.receive()) // suspends
+            expect(9)
+            assertEquals(3, q.poll())
+            expect(10)
+            assertEquals(null, q.poll())
+            expect(11)
+        }
+        expect(2)
+        yield()
+        expect(6)
+        assertTrue(q.offer(2))
+        expect(7)
+        assertTrue(q.offer(3))
+        expect(8)
+        assertFalse(q.offer(4))
+        yield()
+        finish(12)
+    }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelAtomicityStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt
similarity index 86%
rename from kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelAtomicityStressTest.kt
rename to kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt
index d444a5d..a395ce4 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelAtomicityStressTest.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt
@@ -2,13 +2,22 @@
 
 import kotlinx.coroutines.experimental.*
 import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
 import java.util.*
 import kotlin.test.assertEquals
 
-class RendezvousChannelAtomicityStressTest {
+@RunWith(Parameterized::class)
+class ChannelAtomicCancelStressTest(val kind: TestChannelKind) {
+    companion object {
+        @Parameterized.Parameters(name = "{0}")
+        @JvmStatic
+        fun params(): Collection<Array<Any>> = TestChannelKind.values().map { arrayOf<Any>(it) }
+    }
+
     val TEST_DURATION = 3000L
 
-    val channel = RendezvousChannel<Int>()
+    val channel = kind.create()
     val senderDone = RendezvousChannel<Boolean>()
     val receiverDone = RendezvousChannel<Boolean>()
 
@@ -25,7 +34,7 @@
     lateinit var receiver: Job
 
     @Test
-    fun testStress() = runBlocking {
+    fun testAtomicCancelStress() = runBlocking {
         val deadline = System.currentTimeMillis() + TEST_DURATION
         launchSender()
         launchReceiver()
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt
new file mode 100644
index 0000000..32a33ca
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt
@@ -0,0 +1,106 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicInteger
+import kotlin.test.assertEquals
+import kotlin.test.assertTrue
+
+
+@RunWith(Parameterized::class)
+class ChannelSendReceiveStressTest(
+    val kind: TestChannelKind,
+    val nSenders: Int,
+    val nReceivers: Int
+) {
+    companion object {
+        @Parameterized.Parameters(name = "{0}, nSenders={1}, nReceivers={2}")
+        @JvmStatic
+        fun params(): Collection<Array<Any>> =
+                listOf(1, 2, 10).flatMap { nSenders ->
+                    listOf(1, 6).flatMap { nReceivers ->
+                        TestChannelKind.values().map { arrayOf<Any>(it, nSenders, nReceivers) }
+                    }
+                }
+    }
+
+    val nEvents = 1_000_000
+
+    val channel = kind.create()
+    val sendersCompleted = AtomicInteger()
+    val receiversCompleted = AtomicInteger()
+    val dupes = AtomicInteger()
+    val received = ConcurrentHashMap<Int,Int>()
+    val receivedBy = IntArray(nReceivers)
+
+    @Test
+    fun testSendReceiveStress() = runBlocking {
+        val receivers = List(nReceivers) { receiverIndex ->
+            // different event receivers use different code
+            launch(CommonPool + CoroutineName("receiver$receiverIndex")) {
+                when (receiverIndex % 3) {
+                    0 -> doReceive(receiverIndex)
+                    1 -> doReceiveOrNull(receiverIndex)
+                    2 -> doIterator(receiverIndex)
+                }
+                receiversCompleted.incrementAndGet()
+            }
+        }
+        val senders = List(nSenders) { senderIndex ->
+            launch(CommonPool + CoroutineName("sender$senderIndex")) {
+                for (i in senderIndex until nEvents step nSenders)
+                    channel.send(i)
+                sendersCompleted.incrementAndGet()
+            }
+        }
+        senders.forEach { it.join() }
+        channel.close()
+        receivers.forEach { it.join() }
+        println("Tested $kind with nSenders=$nSenders, nReceivers=$nReceivers")
+        println("Completed successfully ${sendersCompleted.get()} sender coroutines")
+        println("Completed successfully ${receiversCompleted.get()} receiver coroutines")
+        println("              Received ${received.size} events")
+        println("        Received dupes ${dupes.get()}")
+        repeat(nReceivers) { receiveIndex ->
+            println("        Received by #$receiveIndex ${receivedBy[receiveIndex]}")
+        }
+        assertEquals(nSenders, sendersCompleted.get())
+        assertEquals(nReceivers, receiversCompleted.get())
+        assertEquals(0, dupes.get())
+        assertEquals(nEvents, received.size)
+        repeat(nReceivers) { receiveIndex ->
+            assertTrue(receivedBy[receiveIndex] > 0, "Each receiver should have received something")
+        }
+    }
+
+    private fun doReceived(receiverIndex: Int, event: Int) {
+        if (received.put(event, event) != null) {
+            println("Duplicate event $event")
+            dupes.incrementAndGet()
+        }
+        receivedBy[receiverIndex]++
+    }
+
+    private suspend fun doReceive(receiverIndex: Int) {
+        while (true) {
+            try { doReceived(receiverIndex, channel.receive()) }
+            catch (ex: ClosedReceiveChannelException) { break }
+        }
+
+    }
+
+    private suspend fun doReceiveOrNull(receiverIndex: Int) {
+        while (true) {
+            doReceived(receiverIndex, channel.receiveOrNull() ?: break)
+        }
+    }
+
+    private suspend fun doIterator(receiverIndex: Int) {
+        for (event in channel) {
+            doReceived(receiverIndex, event)
+        }
+    }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelMultipleReceiversStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelMultipleReceiversStressTest.kt
deleted file mode 100644
index a8269c3..0000000
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelMultipleReceiversStressTest.kt
+++ /dev/null
@@ -1,84 +0,0 @@
-package kotlinx.coroutines.experimental.channels
-
-import kotlinx.coroutines.experimental.CommonPool
-import kotlinx.coroutines.experimental.join
-import kotlinx.coroutines.experimental.launch
-import kotlinx.coroutines.experimental.runBlocking
-import org.junit.Test
-import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.atomic.AtomicInteger
-import kotlin.test.assertEquals
-import kotlin.test.assertTrue
-
-class RendezvousChannelMultipleReceiversStressTest {
-    val nEvents = 1_000_000
-    val nReceivers = 6
-
-    val channel = RendezvousChannel<Int>()
-    val completedSuccessfully = AtomicInteger()
-    val dupes = AtomicInteger()
-    val received = ConcurrentHashMap<Int,Int>()
-    val receivedBy = IntArray(nReceivers)
-
-    @Test
-    fun testStress() = runBlocking {
-        val receivers = List(nReceivers) { receiverIndex ->
-            // different event receivers use different code
-            launch(CommonPool) {
-                when (receiverIndex % 3) {
-                    0 -> doReceive(receiverIndex)
-                    1 -> doReceiveOrNull(receiverIndex)
-                    2 -> doIterator(receiverIndex)
-                }
-                completedSuccessfully.incrementAndGet()
-            }
-        }
-        repeat(nEvents) {
-            channel.send(it)
-        }
-        channel.close()
-        receivers.forEach { it.join() }
-        println("Completed successfully ${completedSuccessfully.get()} coroutines")
-        println("              Received ${received.size} events")
-        println("        Received dupes ${dupes.get()}")
-        repeat(nReceivers) { receiveIndex ->
-            println("        Received by #$receiveIndex ${receivedBy[receiveIndex]}")
-        }
-        assertEquals(nReceivers, completedSuccessfully.get())
-        assertEquals(0, dupes.get())
-        assertEquals(nEvents, received.size)
-        repeat(nReceivers) { receiveIndex ->
-            assertTrue(receivedBy[receiveIndex] > nEvents / nReceivers / 2, "Should be balanced")
-        }
-    }
-
-    private fun doReceived(event: Int) {
-        if (received.put(event, event) != null) {
-            println("Duplicate event $event")
-            dupes.incrementAndGet()
-        }
-    }
-
-    private suspend fun doReceive(receiverIndex: Int) {
-        while (true) {
-            try { doReceived(channel.receive()) }
-            catch (ex: ClosedReceiveChannelException) { break }
-            receivedBy[receiverIndex]++
-        }
-
-    }
-
-    private suspend fun doReceiveOrNull(receiverIndex: Int) {
-        while (true) {
-            doReceived(channel.receiveOrNull() ?: break)
-            receivedBy[receiverIndex]++
-        }
-    }
-
-    private suspend fun doIterator(receiverIndex: Int) {
-        for (event in channel) {
-            doReceived(event)
-            receivedBy[receiverIndex]++
-        }
-    }
-}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
index 669607e..741f5a9 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
@@ -103,14 +103,14 @@
         expect(1)
         launch(context) {
             expect(3)
-            assertEquals(null, q.pool())
+            assertEquals(null, q.poll())
             expect(4)
             assertEquals(2, q.receive())
             expect(7)
-            assertEquals(null, q.pool())
+            assertEquals(null, q.poll())
             yield()
             expect(9)
-            assertEquals(3, q.pool())
+            assertEquals(3, q.poll())
             expect(10)
         }
         expect(2)
@@ -123,4 +123,110 @@
         q.send(3)
         finish(11)
     }
+
+    @Test
+    fun testIteratorClosed() = runBlocking {
+        val q = RendezvousChannel<Int>()
+        expect(1)
+        launch(context) {
+            expect(3)
+            q.close()
+            expect(4)
+        }
+        expect(2)
+        for (x in q) {
+            expectUnreached()
+        }
+        finish(5)
+    }
+
+    @Test
+    fun testIteratorOne() = runBlocking {
+        val q = RendezvousChannel<Int>()
+        expect(1)
+        launch(context) {
+            expect(3)
+            q.send(1)
+            expect(4)
+            q.close()
+            expect(5)
+        }
+        expect(2)
+        for (x in q) {
+            expect(6)
+            assertEquals(1, x)
+        }
+        finish(7)
+    }
+
+    @Test
+    fun testIteratorOneWithYield() = runBlocking {
+        val q = RendezvousChannel<Int>()
+        expect(1)
+        launch(context) {
+            expect(3)
+            q.send(1) // will suspend
+            expect(6)
+            q.close()
+            expect(7)
+        }
+        expect(2)
+        yield() // yield to sender coroutine right before starting for loop
+        expect(4)
+        for (x in q) {
+            expect(5)
+            assertEquals(1, x)
+        }
+        finish(8)
+    }
+
+    @Test
+    fun testIteratorTwo() = runBlocking {
+        val q = RendezvousChannel<Int>()
+        expect(1)
+        launch(context) {
+            expect(3)
+            q.send(1)
+            expect(4)
+            q.send(2)
+            expect(7)
+            q.close()
+            expect(8)
+        }
+        expect(2)
+        for (x in q) {
+            when (x) {
+                1 -> expect(5)
+                2 -> expect(6)
+                else -> expectUnreached()
+            }
+        }
+        finish(9)
+    }
+
+    @Test
+    fun testIteratorTwoWithYield() = runBlocking {
+        val q = RendezvousChannel<Int>()
+        expect(1)
+        launch(context) {
+            expect(3)
+            q.send(1) // will suspend
+            expect(6)
+            q.send(2)
+            expect(7)
+            q.close()
+            expect(8)
+        }
+        expect(2)
+        yield() // yield to sender coroutine right before starting for loop
+        expect(4)
+        for (x in q) {
+            when (x) {
+                1 -> expect(5)
+                2 -> expect(9)
+                else -> expectUnreached()
+            }
+        }
+        finish(10)
+    }
 }
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt
new file mode 100644
index 0000000..1464151
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt
@@ -0,0 +1,40 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.CommonPool
+import kotlinx.coroutines.experimental.launch
+import kotlinx.coroutines.experimental.runBlocking
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import kotlin.test.assertEquals
+
+@RunWith(Parameterized::class)
+class SimpleSendReceiveTest(
+    val kind: TestChannelKind,
+    val n: Int
+) {
+    companion object {
+        @Parameterized.Parameters(name = "{0}, n={1}")
+        @JvmStatic
+        fun params(): Collection<Array<Any>> = TestChannelKind.values().flatMap { kind ->
+            listOf(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 100, 1000).map { n ->
+                arrayOf<Any>(kind, n)
+            }
+        }
+    }
+
+    val channel = kind.create()
+
+    @Test
+    fun testSimpleSendReceive() = runBlocking {
+        launch(CommonPool) {
+            repeat(n) { channel.send(it) }
+            channel.close()
+        }
+        var received = 0
+        for (x in channel) {
+            assertEquals(received++, x)
+        }
+        assertEquals(n, received)
+    }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
new file mode 100644
index 0000000..e5533f4
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
@@ -0,0 +1,19 @@
+package kotlinx.coroutines.experimental.channels
+
+enum class TestChannelKind {
+    RENDEZVOUS {
+        override fun create(): Channel<Int> = RendezvousChannel<Int>()
+        override fun toString(): String = "RendezvousChannel"
+    },
+    ARRAY_1 {
+        override fun create(): Channel<Int> = ArrayChannel<Int>(1)
+        override fun toString(): String = "ArrayChannel(1)"
+    },
+    ARRAY_10 {
+        override fun create(): Channel<Int> = ArrayChannel<Int>(8)
+        override fun toString(): String = "ArrayChannel(8)"
+    }
+    ;
+
+    abstract fun create(): Channel<Int>
+}