Migrate channels and related operators to common, so channels can be used from JS

Fixes #201
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt
new file mode 100644
index 0000000..84da40f
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt
@@ -0,0 +1,179 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class ArrayBroadcastChannelTest : TestBase() {
+
+    @Test
+    fun testBasic() = runTest {
+        expect(1)
+        val broadcast = ArrayBroadcastChannel<Int>(1)
+        assertFalse(broadcast.isClosedForSend)
+        val first = broadcast.openSubscription()
+        launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+            expect(2)
+            assertEquals(1, first.receive()) // suspends
+            assertFalse(first.isClosedForReceive)
+            expect(5)
+            assertEquals(2, first.receive()) // suspends
+            assertFalse(first.isClosedForReceive)
+            expect(10)
+            assertNull(first.receiveOrNull()) // suspends
+            assertTrue(first.isClosedForReceive)
+            expect(14)
+        }
+        expect(3)
+        broadcast.send(1)
+        expect(4)
+        yield() // to the first receiver
+        expect(6)
+
+        val second = broadcast.openSubscription()
+        launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+            expect(7)
+            assertEquals(2, second.receive()) // suspends
+            assertFalse(second.isClosedForReceive)
+            expect(11)
+            assertNull(second.receiveOrNull()) // suspends
+            assertTrue(second.isClosedForReceive)
+            expect(15)
+        }
+        expect(8)
+        broadcast.send(2)
+        expect(9)
+        yield() // to first & second receivers
+        expect(12)
+        broadcast.close()
+        expect(13)
+        assertTrue(broadcast.isClosedForSend)
+        yield() // to first & second receivers
+        finish(16)
+    }
+
+    @Test
+    fun testSendSuspend() = runTest {
+        expect(1)
+        val broadcast = ArrayBroadcastChannel<Int>(1)
+        val first = broadcast.openSubscription()
+        launch(coroutineContext) {
+            expect(4)
+            assertEquals(1, first.receive())
+            expect(5)
+            assertEquals(2, first.receive())
+            expect(6)
+        }
+        expect(2)
+        broadcast.send(1) // puts to buffer, receiver not running yet
+        expect(3)
+        broadcast.send(2) // suspends
+        finish(7)
+    }
+
+    @Test
+    fun testConcurrentSendCompletion() = runTest {
+        expect(1)
+        val broadcast = ArrayBroadcastChannel<Int>(1)
+        val sub = broadcast.openSubscription()
+        // launch 3 concurrent senders (one goes buffer, two other suspend)
+        for (x in 1..3) {
+            launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+                expect(x + 1)
+                broadcast.send(x)
+            }
+        }
+        // and close it for send
+        expect(5)
+        broadcast.close()
+        // now must receive all 3 items
+        expect(6)
+        assertFalse(sub.isClosedForReceive)
+        for (x in 1..3)
+            assertEquals(x, sub.receiveOrNull())
+        // and receive close signal
+        assertNull(sub.receiveOrNull())
+        assertTrue(sub.isClosedForReceive)
+        finish(7)
+    }
+
+    @Test
+    fun testForgetUnsubscribed() = runTest {
+        expect(1)
+        val broadcast = ArrayBroadcastChannel<Int>(1)
+        broadcast.send(1)
+        broadcast.send(2)
+        broadcast.send(3)
+        expect(2) // should not suspend anywhere above
+        val sub = broadcast.openSubscription()
+        launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+            expect(3)
+            assertEquals(4, sub.receive()) // suspends
+            expect(5)
+        }
+        expect(4)
+        broadcast.send(4) // sends
+        yield()
+        finish(6)
+    }
+
+    @Test
+    fun testReceiveFullAfterClose() = runTest {
+        val channel = BroadcastChannel<Int>(10)
+        val sub = channel.openSubscription()
+        // generate into buffer & close
+        for (x in 1..5) channel.send(x)
+        channel.close()
+        // make sure all of them are consumed
+        check(!sub.isClosedForReceive)
+        for (x in 1..5) check(sub.receive() == x)
+        check(sub.receiveOrNull() == null)
+        check(sub.isClosedForReceive)
+    }
+
+    @Test
+    fun testCloseSubDuringIteration() = runTest {
+        val channel = BroadcastChannel<Int>(1)
+        // launch generator (for later) in this context
+        launch(coroutineContext) {
+            for (x in 1..5) channel.send(x)
+            channel.close()
+        }
+        // start consuming
+        val sub = channel.openSubscription()
+        var expected = 0
+        sub.consumeEach {
+            check(it == ++expected)
+            if (it == 2) {
+                sub.close()
+            }
+        }
+        check(expected == 2)
+    }
+
+    @Test
+    fun testReceiveFromClosedSub() = runTest({ it is ClosedReceiveChannelException }) {
+        val channel = BroadcastChannel<Int>(1)
+        val sub = channel.openSubscription()
+        assertFalse(sub.isClosedForReceive)
+        sub.close()
+        assertTrue(sub.isClosedForReceive)
+        sub.receive()
+    }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt
new file mode 100644
index 0000000..61fdaef
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class ArrayChannelTest : TestBase() {
+
+    @Test
+    fun testSimple() = runTest {
+        val q = ArrayChannel<Int>(1)
+        check(q.isEmpty && !q.isFull)
+        expect(1)
+        val sender = launch(coroutineContext) {
+            expect(4)
+            q.send(1) // success -- buffered
+            check(!q.isEmpty && q.isFull)
+            expect(5)
+            q.send(2) // suspends (buffer full)
+            expect(9)
+        }
+        expect(2)
+        val receiver = launch(coroutineContext) {
+            expect(6)
+            check(q.receive() == 1) // does not suspend -- took from buffer
+            check(!q.isEmpty && q.isFull) // waiting sender's element moved to buffer
+            expect(7)
+            check(q.receive() == 2) // does not suspend (takes from sender)
+            expect(8)
+        }
+        expect(3)
+        sender.join()
+        receiver.join()
+        check(q.isEmpty && !q.isFull)
+        finish(10)
+    }
+
+    @Test
+    fun testClosedBufferedReceiveOrNull() = runTest {
+        val q = ArrayChannel<Int>(1)
+        check(q.isEmpty && !q.isFull && !q.isClosedForSend && !q.isClosedForReceive)
+        expect(1)
+        launch(coroutineContext) {
+            expect(5)
+            check(!q.isEmpty && !q.isFull && q.isClosedForSend && !q.isClosedForReceive)
+            assertEquals(42, q.receiveOrNull())
+            expect(6)
+            check(!q.isEmpty && !q.isFull && q.isClosedForSend && q.isClosedForReceive)
+            assertEquals(null, q.receiveOrNull())
+            expect(7)
+        }
+        expect(2)
+        q.send(42) // buffers
+        expect(3)
+        q.close() // goes on
+        expect(4)
+        check(!q.isEmpty && !q.isFull && q.isClosedForSend && !q.isClosedForReceive)
+        yield()
+        check(!q.isEmpty && !q.isFull && q.isClosedForSend && q.isClosedForReceive)
+        finish(8)
+    }
+
+    @Test
+    fun testClosedExceptions() = runTest {
+        val q = ArrayChannel<Int>(1)
+        expect(1)
+        launch(coroutineContext) {
+            expect(4)
+            try { q.receive() }
+            catch (e: ClosedReceiveChannelException) {
+                expect(5)
+            }
+        }
+        expect(2)
+
+        require(q.close())
+        expect(3)
+        yield()
+        expect(6)
+        try { q.send(42) }
+        catch (e: ClosedSendChannelException) {
+            finish(7)
+        }
+    }
+
+    @Test
+    fun testOfferAndPool() = runTest {
+        val q = ArrayChannel<Int>(1)
+        assertTrue(q.offer(1))
+        expect(1)
+        launch(coroutineContext) {
+            expect(3)
+            assertEquals(1, q.poll())
+            expect(4)
+            assertEquals(null, q.poll())
+            expect(5)
+            assertEquals(2, q.receive()) // suspends
+            expect(9)
+            assertEquals(3, q.poll())
+            expect(10)
+            assertEquals(null, q.poll())
+            expect(11)
+        }
+        expect(2)
+        yield()
+        expect(6)
+        assertTrue(q.offer(2))
+        expect(7)
+        assertTrue(q.offer(3))
+        expect(8)
+        assertFalse(q.offer(4))
+        yield()
+        finish(12)
+    }
+
+    @Test
+    fun testConsumeAll() = runTest {
+        val q = ArrayChannel<Int>(5)
+        for (i in 1..10) {
+            if (i <= 5) {
+                expect(i)
+                q.send(i) // shall buffer
+            } else {
+                launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+                    expect(i)
+                    q.send(i) // suspends
+                    expectUnreached() // will get cancelled by cancel
+                }
+            }
+        }
+        expect(11)
+        q.cancel()
+        check(q.isClosedForSend)
+        check(q.isClosedForReceive)
+        check(q.receiveOrNull() == null)
+        finish(12)
+    }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelFactoryTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelFactoryTest.kt
new file mode 100644
index 0000000..2bbc4a1
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelFactoryTest.kt
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlin.test.*
+
+
+class BroadcastChannelFactoryTest {
+
+    @Test
+    fun testRendezvousChannelNotSupported() {
+        assertFailsWith<IllegalArgumentException> { BroadcastChannel<Int>(0) }
+    }
+
+    @Test
+    fun testLinkedListChannelNotSupported() {
+        assertFailsWith<IllegalArgumentException> { BroadcastChannel<Int>(Channel.UNLIMITED) }
+    }
+
+    @Test
+    fun testConflatedBroadcastChannel() {
+        assertTrue { BroadcastChannel<Int>(Channel.CONFLATED) is ConflatedBroadcastChannel }
+    }
+
+    @Test
+    fun testArrayBroadcastChannel() {
+        assertTrue { BroadcastChannel<Int>(1) is ArrayBroadcastChannel }
+        assertTrue { BroadcastChannel<Int>(10) is ArrayBroadcastChannel }
+    }
+
+    @Test
+    fun testInvalidCapacityNotSupported() {
+        assertFailsWith<IllegalArgumentException> { BroadcastChannel<Int>(-2) }
+    }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelFactoryTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelFactoryTest.kt
new file mode 100644
index 0000000..d4c5126
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelFactoryTest.kt
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.test.*
+
+
+class ChannelFactoryTest : TestBase() {
+
+    @Test
+    fun testRendezvousChannel() {
+        assertTrue(Channel<Int>() is RendezvousChannel)
+        assertTrue(Channel<Int>(0) is RendezvousChannel)
+    }
+
+    @Test
+    fun testLinkedListChannel() {
+        assertTrue(Channel<Int>(Channel.UNLIMITED) is LinkedListChannel)
+    }
+
+    @Test
+    fun testConflatedChannel() {
+        assertTrue(Channel<Int>(Channel.CONFLATED) is ConflatedChannel)
+    }
+
+    @Test
+    fun testArrayChannel() {
+        assertTrue(Channel<Int>(1) is ArrayChannel)
+        assertTrue(Channel<Int>(10) is ArrayChannel)
+    }
+
+    @Test
+    fun testInvalidCapacityNotSupported() = runTest({ it is IllegalArgumentException }) {
+        Channel<Int>(-2)
+    }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt
new file mode 100644
index 0000000..0fe1fc9
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt
@@ -0,0 +1,569 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.math.*
+import kotlin.test.*
+
+class ChannelsTest: TestBase() {
+    private val testList = listOf(1, 2, 3)
+
+    @Test
+    fun testIterableAsReceiveChannel() = runTest {
+        assertEquals(testList, testList.asReceiveChannel().toList())
+    }
+
+    @Test
+    fun testSequenceAsReceiveChannel() = runTest {
+        assertEquals(testList, testList.asSequence().asReceiveChannel().toList())
+    }
+
+    @Test
+    fun testAssociate() = runTest {
+        assertEquals(testList.associate { it * 2 to it * 3 },
+            testList.asReceiveChannel().associate { it * 2 to it * 3 }.toMap())
+    }
+
+    @Test
+    fun testAssociateBy() = runTest {
+        assertEquals(testList.associateBy { it % 2 }, testList.asReceiveChannel().associateBy { it % 2 })
+    }
+
+    @Test
+    fun testAssociateBy2() = runTest {
+        assertEquals(testList.associateBy({ it * 2}, { it * 3 }),
+            testList.asReceiveChannel().associateBy({ it * 2}, { it * 3 }).toMap())
+    }
+
+    @Test
+    fun testDistinct() = runTest {
+        assertEquals(testList.map { it % 2 }.distinct(), testList.asReceiveChannel().map { it % 2 }.distinct().toList())
+    }
+
+    @Test
+    fun testDistinctBy() = runTest {
+        assertEquals(testList.distinctBy { it % 2 }.toList(), testList.asReceiveChannel().distinctBy { it % 2 }.toList())
+    }
+
+    @Test
+    fun testToCollection() = runTest {
+        val target = mutableListOf<Int>()
+        testList.asReceiveChannel().toCollection(target)
+        assertEquals(testList, target)
+    }
+
+    @Test
+    fun testDrop() = runTest {
+        for (i in 0..testList.size) {
+            assertEquals(testList.drop(i), testList.asReceiveChannel().drop(i).toList(), "Drop $i")
+        }
+    }
+
+    @Test
+    fun testElementAtOrElse() = runTest {
+        assertEquals(testList.elementAtOrElse(2) { 42 }, testList.asReceiveChannel().elementAtOrElse(2) { 42 })
+        assertEquals(testList.elementAtOrElse(9) { 42 }, testList.asReceiveChannel().elementAtOrElse(9) { 42 })
+    }
+
+    @Test
+    fun testFirst() = runTest {
+        assertEquals(testList.first(), testList.asReceiveChannel().first())
+        for (i in testList) {
+            assertEquals(testList.first { it == i }, testList.asReceiveChannel().first { it == i })
+        }
+        try {
+            testList.asReceiveChannel().first { it == 9 }
+            fail()
+        } catch (nse: NoSuchElementException) {
+        }
+    }
+
+    @Test
+    fun testFirstOrNull() = runTest {
+        assertEquals(testList.firstOrNull(), testList.asReceiveChannel().firstOrNull())
+        assertEquals(testList.firstOrNull { it == 2 }, testList.asReceiveChannel().firstOrNull { it == 2 })
+        assertEquals(testList.firstOrNull { it == 9 }, testList.asReceiveChannel().firstOrNull { it == 9 })
+    }
+
+    @Test
+    fun testFlatMap() = runTest {
+        assertEquals(testList.flatMap { (0..it).toList() }, testList.asReceiveChannel().flatMap { (0..it).asReceiveChannel() }.toList())
+
+    }
+
+    @Test
+    fun testFold() = runTest {
+        assertEquals(testList.fold(mutableListOf(42)) { acc, e -> acc.apply { add(e) } },
+            testList.asReceiveChannel().fold(mutableListOf(42)) { acc, e -> acc.apply { add(e) } }.toList())
+    }
+
+    @Test
+    fun testFoldIndexed() = runTest {
+        assertEquals(testList.foldIndexed(mutableListOf(42)) { index, acc, e -> acc.apply { add(index + e) } },
+            testList.asReceiveChannel().foldIndexed(mutableListOf(42)) { index, acc, e -> acc.apply { add(index + e) } }.toList())
+    }
+
+    @Test
+    fun testGroupBy() = runTest {
+        assertEquals(testList.groupBy { it % 2 }, testList.asReceiveChannel().groupBy { it % 2 })
+    }
+
+    @Test
+    fun testGroupBy2() = runTest {
+        assertEquals(testList.groupBy({ -it }, { it + 100 }), testList.asReceiveChannel().groupBy({ -it }, { it + 100 }).toMap())
+
+    }
+
+    @Test
+    fun testMap() = runTest {
+        assertEquals(testList.map { it + 10 }, testList.asReceiveChannel().map { it + 10 }.toList())
+
+    }
+
+    @Test
+    fun testMapToCollection() = runTest {
+        val c = mutableListOf<Int>()
+        testList.asReceiveChannel().mapTo(c) { it + 10 }
+        assertEquals(testList.map { it + 10 }, c)
+    }
+
+    @Test
+    fun testMapToSendChannel() = runTest {
+        val c = produce<Int> {
+            testList.asReceiveChannel().mapTo(channel) { it + 10 }
+        }
+        assertEquals(testList.map { it + 10 }, c.toList())
+    }
+
+    @Test
+    fun testEmptyList() = runTest {
+        assertTrue(emptyList<Nothing>().asReceiveChannel().toList().isEmpty())
+    }
+
+    @Test
+    fun testToList() = runTest {
+        assertEquals(testList, testList.asReceiveChannel().toList())
+
+    }
+
+    @Test
+    fun testEmptySet() = runTest {
+        assertTrue(emptyList<Nothing>().asReceiveChannel().toSet().isEmpty())
+
+    }
+
+    @Test
+    fun testToSet() = runTest {
+        assertEquals(testList.toSet(), testList.asReceiveChannel().toSet())
+    }
+
+    @Test
+    fun testToMutableSet() = runTest {
+        assertEquals(testList.toMutableSet(), testList.asReceiveChannel().toMutableSet())
+    }
+
+    @Test
+    fun testEmptySequence() = runTest {
+        val channel = Channel<Nothing>()
+        channel.close()
+
+        assertTrue(emptyList<Nothing>().asReceiveChannel().count() == 0)
+    }
+
+    @Test
+    fun testEmptyMap() = runTest {
+        val channel = Channel<Pair<Nothing, Nothing>>()
+        channel.close()
+
+        assertTrue(channel.toMap().isEmpty())
+    }
+
+    @Test
+    fun testToMap() = runTest {
+        val values = testList.map { it to it.toString() }
+        assertEquals(values.toMap(), values.asReceiveChannel().toMap())
+    }
+
+    @Test
+    fun testReduce() = runTest {
+        assertEquals(testList.reduce { acc, e -> acc * e },
+            testList.asReceiveChannel().reduce { acc, e -> acc * e })
+    }
+
+    @Test
+    fun testReduceIndexed() = runTest {
+        assertEquals(testList.reduceIndexed { index, acc, e -> index + acc * e },
+            testList.asReceiveChannel().reduceIndexed { index, acc, e -> index + acc * e })
+    }
+
+    @Test
+    fun testTake() = runTest {
+        for (i in 0..testList.size) {
+            assertEquals(testList.take(i), testList.asReceiveChannel().take(i).toList())
+        }
+    }
+
+    @Test
+    fun testPartition() = runTest {
+        assertEquals(testList.partition { it % 2 == 0 }, testList.asReceiveChannel().partition { it % 2 == 0 })
+    }
+
+    @Test
+    fun testZip() = runTest {
+        val other = listOf("a", "b")
+        assertEquals(testList.zip(other), testList.asReceiveChannel().zip(other.asReceiveChannel()).toList())
+    }
+
+    @Test
+    fun testElementAt() = runTest {
+        testList.indices.forEach { i ->
+            assertEquals(testList[i], testList.asReceiveChannel().elementAt(i))
+        }
+    }
+
+    @Test
+    fun testElementAtOrNull() = runTest {
+        testList.indices.forEach { i ->
+            assertEquals(testList[i], testList.asReceiveChannel().elementAtOrNull(i))
+        }
+        assertEquals(null, testList.asReceiveChannel().elementAtOrNull(-1))
+        assertEquals(null, testList.asReceiveChannel().elementAtOrNull(testList.size))
+    }
+
+    @Test
+    fun testFind() = runTest {
+        repeat(3) { mod ->
+            assertEquals(testList.find { it % 2 == mod },
+                testList.asReceiveChannel().find { it % 2 == mod })
+        }
+    }
+
+    @Test
+    fun testFindLast() = runTest {
+        repeat(3) { mod ->
+            assertEquals(testList.findLast { it % 2 == mod }, testList.asReceiveChannel().findLast { it % 2 == mod })
+        }
+    }
+
+    @Test
+    fun testIndexOf() = runTest {
+        repeat(testList.size + 1) { i ->
+            assertEquals(testList.indexOf(i), testList.asReceiveChannel().indexOf(i))
+        }
+    }
+
+    @Test
+    fun testLastIndexOf() = runTest {
+        repeat(testList.size + 1) { i ->
+            assertEquals(testList.lastIndexOf(i), testList.asReceiveChannel().lastIndexOf(i))
+        }
+    }
+
+    @Test
+    fun testIndexOfFirst() = runTest {
+        repeat(3) { mod ->
+            assertEquals(testList.indexOfFirst { it % 2 == mod },
+                testList.asReceiveChannel().indexOfFirst { it % 2 == mod })
+        }
+    }
+
+    @Test
+    fun testIndexOfLast() = runTest {
+        repeat(3) { mod ->
+            assertEquals(testList.indexOfLast { it % 2 != mod },
+                testList.asReceiveChannel().indexOfLast { it % 2 != mod })
+        }
+    }
+
+    @Test
+    fun testLastOrNull() = runTest {
+        assertEquals(testList.lastOrNull(), testList.asReceiveChannel().lastOrNull())
+        assertEquals(null, emptyList<Int>().asReceiveChannel().lastOrNull())
+    }
+
+    @Test
+    fun testSingleOrNull() = runTest {
+        assertEquals(1, listOf(1).asReceiveChannel().singleOrNull())
+        assertEquals(null, listOf(1, 2).asReceiveChannel().singleOrNull())
+        assertEquals(null, emptyList<Int>().asReceiveChannel().singleOrNull())
+        repeat(testList.size + 1) { i ->
+            assertEquals(testList.singleOrNull { it == i },
+                testList.asReceiveChannel().singleOrNull { it == i })
+        }
+        repeat(3) { mod ->
+            assertEquals(testList.singleOrNull { it % 2 == mod },
+                testList.asReceiveChannel().singleOrNull { it % 2 == mod })
+        }
+    }
+
+    @Test
+    fun testDropWhile() = runTest {
+        repeat(3) { mod ->
+            assertEquals(testList.dropWhile { it % 2 == mod },
+                testList.asReceiveChannel().dropWhile { it % 2 == mod }.toList())
+        }
+    }
+
+    @Test
+    fun testFilter() = runTest {
+        repeat(3) { mod ->
+            assertEquals(testList.filter { it % 2 == mod },
+                testList.asReceiveChannel().filter { it % 2 == mod }.toList())
+        }
+    }
+
+    @Test
+    fun testFilterToCollection() = runTest {
+        repeat(3) { mod ->
+            val c = mutableListOf<Int>()
+            testList.asReceiveChannel().filterTo(c) { it % 2 == mod }
+            assertEquals(testList.filter { it % 2 == mod }, c)
+        }
+    }
+
+    @Test
+    fun testFilterToSendChannel() = runTest {
+        repeat(3) { mod ->
+            val c = produce<Int> {
+                testList.asReceiveChannel().filterTo(channel) { it % 2 == mod }
+            }
+            assertEquals(testList.filter { it % 2 == mod }, c.toList())
+        }
+    }
+
+    @Test
+    fun testFilterNot() = runTest {
+        repeat(3) { mod ->
+            assertEquals(testList.filterNot { it % 2 == mod },
+                testList.asReceiveChannel().filterNot { it % 2 == mod }.toList())
+        }
+    }
+
+    @Test
+    fun testFilterNotToCollection() = runTest {
+        repeat(3) { mod ->
+            val c = mutableListOf<Int>()
+            testList.asReceiveChannel().filterNotTo(c) { it % 2 == mod }
+            assertEquals(testList.filterNot { it % 2 == mod }, c)
+        }
+    }
+
+    @Test
+    fun testFilterNotToSendChannel() = runTest {
+        repeat(3) { mod ->
+            val c = produce<Int> {
+                testList.asReceiveChannel().filterNotTo(channel) { it % 2 == mod }
+            }
+            assertEquals(testList.filterNot { it % 2 == mod }, c.toList())
+        }
+    }
+
+    @Test
+    fun testFilterNotNull() = runTest {
+        repeat(3) { mod ->
+            assertEquals(testList.map { it.takeIf { it % 2 == mod } }.filterNotNull(),
+                testList.asReceiveChannel().map { it.takeIf { it % 2 == mod } }.filterNotNull().toList())
+        }
+    }
+
+    @Test
+    fun testFilterNotNullToCollection() = runTest {
+        repeat(3) { mod ->
+            val c = mutableListOf<Int>()
+            testList.asReceiveChannel().map { it.takeIf { it % 2 == mod } }.filterNotNullTo(c)
+            assertEquals(testList.map { it.takeIf { it % 2 == mod } }.filterNotNull(), c)
+        }
+    }
+
+    @Test
+    fun testFilterNotNullToSendChannel() = runTest {
+        repeat(3) { mod ->
+            val c = produce<Int> {
+                testList.asReceiveChannel().map { it.takeIf { it % 2 == mod } }.filterNotNullTo(channel)
+            }
+            assertEquals(testList.map { it.takeIf { it % 2 == mod } }.filterNotNull(), c.toList())
+        }
+    }
+
+    @Test
+    fun testFilterIndexed() = runTest {
+        repeat(3) { mod ->
+            assertEquals(testList.filterIndexed { index, _ ->  index % 2 == mod },
+                testList.asReceiveChannel().filterIndexed { index, _ ->  index % 2 == mod }.toList())
+        }
+    }
+
+    @Test
+    fun testFilterIndexedToCollection() = runTest {
+        repeat(3) { mod ->
+            val c = mutableListOf<Int>()
+            testList.asReceiveChannel().filterIndexedTo(c) { index, _ ->  index % 2 == mod }
+            assertEquals(testList.filterIndexed { index, _ ->  index % 2 == mod }, c)
+        }
+    }
+
+    @Test
+    fun testFilterIndexedToChannel() = runTest {
+        repeat(3) { mod ->
+            val c = produce<Int> {
+                testList.asReceiveChannel().filterIndexedTo(channel) { index, _ -> index % 2 == mod }
+            }
+            assertEquals(testList.filterIndexed { index, _ ->  index % 2 == mod }, c.toList())
+        }
+    }
+
+    @Test
+    fun testTakeWhile() = runTest {
+        repeat(3) { mod ->
+            assertEquals(testList.takeWhile { it % 2 != mod },
+                testList.asReceiveChannel().takeWhile { it % 2 != mod }.toList())
+        }
+    }
+
+    @Test
+    fun testToChannel() = runTest {
+        val c = produce<Int> {
+            testList.asReceiveChannel().toChannel(channel)
+        }
+        assertEquals(testList, c.toList())
+    }
+
+    @Test
+    fun testMapIndexed() = runTest {
+        assertEquals(testList.mapIndexed { index, i -> index + i },
+            testList.asReceiveChannel().mapIndexed { index, i -> index + i }.toList())
+    }
+
+    @Test
+    fun testMapIndexedToCollection() = runTest {
+        val c = mutableListOf<Int>()
+        testList.asReceiveChannel().mapIndexedTo(c) { index, i -> index + i }
+        assertEquals(testList.mapIndexed { index, i -> index + i }, c)
+    }
+
+    @Test
+    fun testMapIndexedToSendChannel() = runTest {
+        val c = produce<Int> {
+            testList.asReceiveChannel().mapIndexedTo(channel) { index, i -> index + i }
+        }
+        assertEquals(testList.mapIndexed { index, i -> index + i }, c.toList())
+    }
+
+    @Test
+    fun testMapNotNull() = runTest {
+        repeat(3) { mod ->
+            assertEquals(testList.mapNotNull { i -> i.takeIf { i % 2 == mod } },
+                testList.asReceiveChannel().mapNotNull { i -> i.takeIf { i % 2 == mod } }.toList())
+        }
+    }
+
+    @Test
+    fun testMapNotNullToCollection() = runTest {
+        repeat(3) { mod ->
+            val c = mutableListOf<Int>()
+            testList.asReceiveChannel().mapNotNullTo(c) { i -> i.takeIf { i % 2 == mod } }
+            assertEquals(testList.mapNotNull { i -> i.takeIf { i % 2 == mod } }, c)
+        }
+    }
+
+    @Test
+    fun testMapNotNullToSendChannel() = runTest {
+        repeat(3) { mod ->
+            val c = produce<Int> {
+                testList.asReceiveChannel().mapNotNullTo(channel) { i -> i.takeIf { i % 2 == mod } }
+            }
+            assertEquals(testList.mapNotNull { i -> i.takeIf { i % 2 == mod } }, c.toList())
+        }
+    }
+
+    @Test
+    fun testMapIndexedNotNull() = runTest {
+        repeat(3) { mod ->
+            assertEquals(testList.mapIndexedNotNull { index, i -> index.takeIf { i % 2 == mod } },
+                testList.asReceiveChannel().mapIndexedNotNull { index, i -> index.takeIf { i % 2 == mod } }.toList())
+        }
+    }
+
+    @Test
+    fun testMapIndexedNotNullToCollection() = runTest {
+        repeat(3) { mod ->
+            val c = mutableListOf<Int>()
+            testList.asReceiveChannel().mapIndexedNotNullTo(c) { index, i -> index.takeIf { i % 2 == mod } }
+            assertEquals(testList.mapIndexedNotNull { index, i -> index.takeIf { i % 2 == mod } }, c)
+        }
+    }
+
+    @Test
+    fun testMapIndexedNotNullToSendChannel() = runTest {
+        repeat(3) { mod ->
+            val c = produce<Int> {
+                testList.asReceiveChannel().mapIndexedNotNullTo(channel) { index, i -> index.takeIf { i % 2 == mod } }
+            }
+            assertEquals(testList.mapIndexedNotNull { index, i -> index.takeIf { i % 2 == mod } }, c.toList())
+        }
+    }
+
+    @Test
+    fun testWithIndex() = runTest {
+        assertEquals(testList.withIndex().toList(), testList.asReceiveChannel().withIndex().toList())
+    }
+
+    @Test
+    fun testMaxBy() = runTest {
+        assertEquals(testList.maxBy { 10 - abs(it - 2) },
+            testList.asReceiveChannel().maxBy { 10 - abs(it - 2) })
+    }
+
+    @Test
+    fun testMaxWith() = runTest {
+        val cmp = compareBy<Int> { 10 - abs(it - 2) }
+        assertEquals(testList.maxWith(cmp),
+            testList.asReceiveChannel().maxWith(cmp))
+    }
+
+    @Test
+    fun testMinBy() = runTest {
+        assertEquals(testList.minBy { abs(it - 2) },
+            testList.asReceiveChannel().minBy { abs(it - 2) })
+    }
+
+    @Test
+    fun testMinWith() = runTest {
+        val cmp = compareBy<Int> { abs(it - 2) }
+        assertEquals(testList.minWith(cmp),
+            testList.asReceiveChannel().minWith(cmp))
+    }
+
+    @Test
+    fun testSumBy() = runTest {
+        assertEquals(testList.sumBy { it * 3 },
+            testList.asReceiveChannel().sumBy { it * 3 })
+    }
+
+    @Test
+    fun testSumByDouble() = runTest {
+        val expected = testList.sumByDouble { it * 3.0 }
+        val actual = testList.asReceiveChannel().sumByDouble { it * 3.0 }
+        assertEquals(expected, actual)
+    }
+
+    @Test
+    fun testRequireNoNulls() = runTest {
+        assertEquals(testList.requireNoNulls(), testList.asReceiveChannel().requireNoNulls().toList())
+    }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt
new file mode 100644
index 0000000..4c04f8f
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class ConflatedBroadcastChannelTest : TestBase() {
+
+    @Test
+    fun testBasicScenario() = runTest {
+        expect(1)
+        val broadcast = ConflatedBroadcastChannel<String>()
+        assertTrue(exceptionFromNotInline { broadcast.value } is IllegalStateException)
+        assertNull(broadcast.valueOrNull)
+
+        launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+            expect(2)
+            val sub = broadcast.openSubscription()
+            assertNull(sub.poll())
+            expect(3)
+            assertEquals("one", sub.receive()) // suspends
+            expect(6)
+            assertEquals("two", sub.receive()) // suspends
+            expect(12)
+            sub.close()
+            expect(13)
+        }
+
+        expect(4)
+        broadcast.send("one") // does not suspend
+        assertEquals("one", broadcast.value)
+        assertEquals("one", broadcast.valueOrNull)
+        expect(5)
+        yield() // to receiver
+        expect(7)
+        launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+            expect(8)
+            val sub = broadcast.openSubscription()
+            assertEquals("one", sub.receive()) // does not suspend
+            expect(9)
+            assertEquals("two", sub.receive()) // suspends
+            expect(14)
+            assertEquals("three", sub.receive()) // suspends
+            expect(17)
+            assertNull(sub.receiveOrNull()) // suspends until closed
+            expect(20)
+            sub.close()
+            expect(21)
+        }
+
+        expect(10)
+        broadcast.send("two") // does not suspend
+        assertEquals("two", broadcast.value)
+        assertEquals("two", broadcast.valueOrNull)
+        expect(11)
+        yield() // to both receivers
+        expect(15)
+        broadcast.send("three") // does not suspend
+        assertEquals("three", broadcast.value)
+        assertEquals("three", broadcast.valueOrNull)
+        expect(16)
+        yield() // to second receiver
+        expect(18)
+        broadcast.close()
+        assertTrue(exceptionFromNotInline { broadcast.value } is IllegalStateException)
+        assertNull(broadcast.valueOrNull)
+        expect(19)
+        yield() // to second receiver
+        assertTrue(exceptionFrom { broadcast.send("four") } is ClosedSendChannelException)
+        finish(22)
+    }
+
+    @Test
+    fun testInitialValueAndReceiveClosed() = runTest {
+        expect(1)
+        val broadcast = ConflatedBroadcastChannel(1)
+        assertEquals(1, broadcast.value)
+        assertEquals(1, broadcast.valueOrNull)
+        launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+            expect(2)
+            val sub = broadcast.openSubscription()
+            assertEquals(1, sub.receive())
+            expect(3)
+            assertTrue(exceptionFrom { sub.receive() } is ClosedReceiveChannelException) // suspends
+            expect(6)
+        }
+        expect(4)
+        broadcast.close()
+        expect(5)
+        yield() // to child
+        finish(7)
+    }
+
+    inline fun exceptionFrom(block: () -> Unit): Throwable? {
+        try {
+            block()
+            return null
+        } catch (e: Throwable) {
+            return e
+        }
+    }
+
+    // Ugly workaround for bug in JS compiler
+    fun exceptionFromNotInline(block: () -> Unit): Throwable? {
+        try {
+            block()
+            return null
+        } catch (e: Throwable) {
+            return e
+        }
+    }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt
new file mode 100644
index 0000000..1fd7413
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class ConflatedChannelTest : TestBase() {
+
+    @Test
+    fun testBasicConflationOfferPoll() {
+        val q = ConflatedChannel<Int>()
+        assertNull(q.poll())
+        assertTrue(q.offer(1))
+        assertTrue(q.offer(2))
+        assertTrue(q.offer(3))
+        assertEquals(3, q.poll())
+        assertNull(q.poll())
+    }
+
+    @Test
+    fun testConflatedSend() = runTest {
+        val q = ConflatedChannel<Int>()
+        q.send(1)
+        q.send(2) // shall conflated previously sent
+        assertEquals(2, q.receiveOrNull())
+    }
+
+    @Test
+    fun testConflatedClose() = runTest {
+        val q = ConflatedChannel<Int>()
+        q.send(1)
+        q.close() // shall conflate sent item and become closed
+        assertNull(q.receiveOrNull())
+    }
+
+    @Test
+    fun testConflationSendReceive() = runTest {
+        val q = ConflatedChannel<Int>()
+        expect(1)
+        launch(coroutineContext) { // receiver coroutine
+            expect(4)
+            assertEquals(2, q.receive())
+            expect(5)
+            assertEquals(3, q.receive()) // this receive suspends
+            expect(8)
+            assertEquals(6, q.receive()) // last conflated value
+            expect(9)
+        }
+        expect(2)
+        q.send(1)
+        q.send(2) // shall conflate
+        expect(3)
+        yield() // to receiver
+        expect(6)
+        q.send(3) // send to the waiting receiver
+        q.send(4) // buffer
+        q.send(5) // conflate
+        q.send(6) // conflate again
+        expect(7)
+        yield() // to receiver
+        finish(10)
+    }
+
+    @Test
+    fun testConsumeAll() = runTest {
+        val q = ConflatedChannel<Int>()
+        expect(1)
+        for (i in 1..10) {
+            q.send(i) // stores as last
+        }
+        q.cancel()
+        check(q.isClosedForSend)
+        check(q.isClosedForReceive)
+        check(q.receiveOrNull() == null)
+        finish(2)
+    }
+}
\ No newline at end of file
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt
new file mode 100644
index 0000000..897801e
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.TestBase
+import kotlin.test.*
+
+class LinkedListChannelTest : TestBase() {
+
+    @Test
+    fun testBasic() = runTest {
+        val c = LinkedListChannel<Int>()
+        c.send(1)
+        check(c.offer(2))
+        c.send(3)
+        check(c.close())
+        check(!c.close())
+        assertEquals(1, c.receive())
+        assertEquals(2, c.poll())
+        assertEquals(3, c.receiveOrNull())
+        assertNull(c.receiveOrNull())
+    }
+
+    @Test
+    fun testConsumeAll() = runTest {
+        val q = LinkedListChannel<Int>()
+        for (i in 1..10) {
+            q.send(i) // buffers
+        }
+        q.cancel()
+        check(q.isClosedForSend)
+        check(q.isClosedForReceive)
+        check(q.receiveOrNull() == null)
+    }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeTest.kt
new file mode 100644
index 0000000..6646aa6
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeTest.kt
@@ -0,0 +1,55 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class ProduceConsumeTest : TestBase() {
+
+    @Test
+    fun testRendezvous() = runTest {
+        testProducer(1)
+    }
+
+    @Test
+    fun testSmallBuffer() = runTest {
+        testProducer(1)
+    }
+
+    @Test
+    fun testMediumBuffer() = runTest {
+        testProducer(10)
+    }
+
+    @Test
+    fun testLargeMediumBuffer() = runTest {
+        testProducer(1000)
+    }
+
+    @Test
+    fun testUnlimited() = runTest {
+        testProducer(Channel.UNLIMITED)
+    }
+
+    private suspend fun testProducer(producerCapacity: Int) {
+        testProducer(1, producerCapacity)
+        testProducer(10, producerCapacity)
+        testProducer(100, producerCapacity)
+    }
+
+    private suspend fun testProducer(messages: Int, producerCapacity: Int) {
+        var sentAll = false
+        val producer = produce(coroutineContext, capacity = producerCapacity) {
+            for (i in 1..messages) {
+                send(i)
+            }
+            sentAll = true
+        }
+        var consumed = 0
+        for (x in producer) {
+            consumed++
+        }
+        assertTrue(sentAll)
+        assertEquals(messages, consumed)
+    }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt
new file mode 100644
index 0000000..522f6d6
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class ProduceTest : TestBase() {
+
+    @Test
+    fun testBasic() = runTest {
+        val c = produce(coroutineContext) {
+            expect(2)
+            send(1)
+            expect(3)
+            send(2)
+            expect(6)
+        }
+        expect(1)
+        check(c.receive() == 1)
+        expect(4)
+        check(c.receive() == 2)
+        expect(5)
+        check(c.receiveOrNull() == null)
+        finish(7)
+    }
+
+    @Test
+    fun testCancel() = runTest {
+        val c = produce(coroutineContext) {
+            expect(2)
+            send(1)
+            expect(3)
+            try {
+                send(2) // will get cancelled
+            } catch (e: Throwable) {
+                finish(7)
+                check(e is JobCancellationException && e.job == coroutineContext[Job])
+                throw e
+            }
+            expectUnreached()
+        }
+        expect(1)
+        check(c.receive() == 1)
+        expect(4)
+        c.cancel()
+        expect(5)
+        check(c.receiveOrNull() == null)
+        expect(6)
+    }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
new file mode 100644
index 0000000..6e1b2c3
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
@@ -0,0 +1,292 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class RendezvousChannelTest : TestBase() {
+
+    @Test
+    fun testSimple() = runTest {
+        val q = RendezvousChannel<Int>()
+        check(q.isEmpty && q.isFull)
+        expect(1)
+        val sender = launch(coroutineContext) {
+            expect(4)
+            q.send(1) // suspend -- the first to come to rendezvous
+            expect(7)
+            q.send(2) // does not suspend -- receiver is there
+            expect(8)
+        }
+        expect(2)
+        val receiver = launch(coroutineContext) {
+            expect(5)
+            check(q.receive() == 1) // does not suspend -- sender was there
+            expect(6)
+            check(q.receive() == 2) // suspends
+            expect(9)
+        }
+        expect(3)
+        sender.join()
+        receiver.join()
+        check(q.isEmpty && q.isFull)
+        finish(10)
+    }
+
+    @Test
+    fun testClosedReceiveOrNull() = runTest {
+        val q = RendezvousChannel<Int>()
+        check(q.isEmpty && q.isFull && !q.isClosedForSend && !q.isClosedForReceive)
+        expect(1)
+        launch(coroutineContext) {
+            expect(3)
+            assertEquals(42, q.receiveOrNull())
+            expect(4)
+            assertEquals(null, q.receiveOrNull())
+            expect(6)
+        }
+        expect(2)
+        q.send(42)
+        expect(5)
+        q.close()
+        check(!q.isEmpty && !q.isFull && q.isClosedForSend && q.isClosedForReceive)
+        yield()
+        check(!q.isEmpty && !q.isFull && q.isClosedForSend && q.isClosedForReceive)
+        finish(7)
+    }
+
+    @Test
+    fun testClosedExceptions() = runTest {
+        val q = RendezvousChannel<Int>()
+        expect(1)
+        launch(coroutineContext) {
+            expect(4)
+            try { q.receive() }
+            catch (e: ClosedReceiveChannelException) {
+                expect(5)
+            }
+        }
+        expect(2)
+        q.close()
+        expect(3)
+        yield()
+        expect(6)
+        try { q.send(42) }
+        catch (e: ClosedSendChannelException) {
+            finish(7)
+        }
+    }
+
+    @Test
+    fun testOfferAndPool() = runTest {
+        val q = RendezvousChannel<Int>()
+        assertFalse(q.offer(1))
+        expect(1)
+        launch(coroutineContext) {
+            expect(3)
+            assertEquals(null, q.poll())
+            expect(4)
+            assertEquals(2, q.receive())
+            expect(7)
+            assertEquals(null, q.poll())
+            yield()
+            expect(9)
+            assertEquals(3, q.poll())
+            expect(10)
+        }
+        expect(2)
+        yield()
+        expect(5)
+        assertTrue(q.offer(2))
+        expect(6)
+        yield()
+        expect(8)
+        q.send(3)
+        finish(11)
+    }
+
+    @Test
+    fun testIteratorClosed() = runTest {
+        val q = RendezvousChannel<Int>()
+        expect(1)
+        launch(coroutineContext) {
+            expect(3)
+            q.close()
+            expect(4)
+        }
+        expect(2)
+        for (x in q) {
+            expectUnreached()
+        }
+        finish(5)
+    }
+
+    @Test
+    fun testIteratorOne() = runTest {
+        val q = RendezvousChannel<Int>()
+        expect(1)
+        launch(coroutineContext) {
+            expect(3)
+            q.send(1)
+            expect(4)
+            q.close()
+            expect(5)
+        }
+        expect(2)
+        for (x in q) {
+            expect(6)
+            assertEquals(1, x)
+        }
+        finish(7)
+    }
+
+    @Test
+    fun testIteratorOneWithYield() = runTest {
+        val q = RendezvousChannel<Int>()
+        expect(1)
+        launch(coroutineContext) {
+            expect(3)
+            q.send(1) // will suspend
+            expect(6)
+            q.close()
+            expect(7)
+        }
+        expect(2)
+        yield() // yield to sender coroutine right before starting for loop
+        expect(4)
+        for (x in q) {
+            expect(5)
+            assertEquals(1, x)
+        }
+        finish(8)
+    }
+
+    @Test
+    fun testIteratorTwo() = runTest {
+        val q = RendezvousChannel<Int>()
+        expect(1)
+        launch(coroutineContext) {
+            expect(3)
+            q.send(1)
+            expect(4)
+            q.send(2)
+            expect(7)
+            q.close()
+            expect(8)
+        }
+        expect(2)
+        for (x in q) {
+            when (x) {
+                1 -> expect(5)
+                2 -> expect(6)
+                else -> expectUnreached()
+            }
+        }
+        finish(9)
+    }
+
+    @Test
+    fun testIteratorTwoWithYield() = runTest {
+        val q = RendezvousChannel<Int>()
+        expect(1)
+        launch(coroutineContext) {
+            expect(3)
+            q.send(1) // will suspend
+            expect(6)
+            q.send(2)
+            expect(7)
+            q.close()
+            expect(8)
+        }
+        expect(2)
+        yield() // yield to sender coroutine right before starting for loop
+        expect(4)
+        for (x in q) {
+            when (x) {
+                1 -> expect(5)
+                2 -> expect(9)
+                else -> expectUnreached()
+            }
+        }
+        finish(10)
+    }
+
+    @Test
+    fun testSuspendSendOnClosedChannel() = runTest {
+        val q = RendezvousChannel<Int>()
+        expect(1)
+        launch(coroutineContext) {
+            expect(4)
+            q.send(42) // suspend
+            expect(11)
+        }
+        expect(2)
+        launch(coroutineContext) {
+            expect(5)
+            q.close()
+            expect(6)
+        }
+        expect(3)
+        yield() // to sender
+        expect(7)
+        yield() // try to resume sender (it will not resume despite the close!)
+        expect(8)
+        assertEquals(42, q.receiveOrNull())
+        expect(9)
+        assertNull(q.receiveOrNull())
+        expect(10)
+        yield() // to sender, it was resumed!
+        finish(12)
+    }
+
+    class BadClass {
+        override fun equals(other: Any?): Boolean = error("equals")
+        override fun hashCode(): Int = error("hashCode")
+        override fun toString(): String = error("toString")
+    }
+
+    @Test
+    fun testProduceBadClass() = runTest {
+        val bad = BadClass()
+        val c = produce(coroutineContext) {
+            expect(1)
+            send(bad)
+        }
+        assertTrue(c.receive() === bad)
+        finish(2)
+    }
+
+    @Test
+    fun testConsumeAll() = runTest {
+        val q = RendezvousChannel<Int>()
+        for (i in 1..10) {
+            launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+                expect(i)
+                q.send(i) // suspends
+                expectUnreached() // will get cancelled by cancel
+            }
+        }
+        expect(11)
+        q.cancel()
+        check(q.isClosedForSend)
+        check(q.isClosedForReceive)
+        check(q.receiveOrNull() == null)
+        finish(12)
+    }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SendReceiveStressTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SendReceiveStressTest.kt
new file mode 100644
index 0000000..c85f541
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SendReceiveStressTest.kt
@@ -0,0 +1,46 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class SendReceiveStressTest : TestBase() {
+
+    // Emulate parametrized by hand :(
+
+    @Test
+    fun testArrayChannel() = runTest {
+        testStress(ArrayChannel(2))
+    }
+
+    @Test
+    fun testLinkedListChannel() = runTest {
+        testStress(LinkedListChannel())
+    }
+
+    @Test
+    fun testRendezvousChannel() = runTest {
+        testStress(RendezvousChannel())
+    }
+
+    private suspend fun testStress(channel: Channel<Int>) {
+        val n = 1_000 // Do not increase, otherwise node.js will fail with timeout :(
+        val sender = launch(coroutineContext) {
+            for (i in 1..n) {
+                channel.send(i)
+            }
+            expect(2)
+        }
+        val receiver = launch(coroutineContext) {
+            for (i in 1..n) {
+                val next = channel.receive()
+                check(next == i)
+            }
+            expect(3)
+        }
+        expect(1)
+        sender.join()
+        receiver.join()
+        finish(4)
+    }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt
new file mode 100644
index 0000000..69e939f
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt
@@ -0,0 +1,35 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class SimpleSendReceiveTest : TestBase() {
+
+    @Test
+    fun testSimpleSendReceive() = runTest {
+        // Parametrized common test :(
+        TestChannelKind.values().forEach { kind -> testSendReceive(kind, 100) }
+    }
+
+    private suspend fun testSendReceive(kind: TestChannelKind, iterations: Int) {
+        val channel = kind.create()
+
+        launch(coroutineContext) {
+            repeat(iterations) { channel.send(it) }
+            channel.close()
+        }
+        var expected = 0
+        for (x in channel) {
+            if (!kind.isConflated) {
+                assertEquals(expected++, x)
+            } else {
+                assertTrue(x >= expected)
+                expected = x + 1
+            }
+        }
+        if (!kind.isConflated) {
+            assertEquals(iterations, expected)
+        }
+    }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestBroadcastChannelKind.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestBroadcastChannelKind.kt
new file mode 100644
index 0000000..60dbb97
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestBroadcastChannelKind.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+enum class TestBroadcastChannelKind {
+    ARRAY_1 {
+        override fun <T> create(): BroadcastChannel<T> = ArrayBroadcastChannel(1)
+        override fun toString(): String = "ArrayBroadcastChannel(1)"
+    },
+    ARRAY_10 {
+        override fun <T> create(): BroadcastChannel<T> = ArrayBroadcastChannel(10)
+        override fun toString(): String = "ArrayBroadcastChannel(10)"
+    },
+    CONFLATED {
+        override fun <T> create(): BroadcastChannel<T> = ConflatedBroadcastChannel()
+        override fun toString(): String = "ConflatedBroadcastChannel"
+        override val isConflated: Boolean get() = true
+    }
+    ;
+
+    abstract fun <T> create(): BroadcastChannel<T>
+    open val isConflated: Boolean get() = false
+}
\ No newline at end of file
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
new file mode 100644
index 0000000..c3ac904
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.selects.SelectClause1
+
+enum class TestChannelKind {
+    RENDEZVOUS {
+        override fun create(): Channel<Int> = RendezvousChannel()
+        override fun toString(): String = "RendezvousChannel"
+    },
+    ARRAY_1 {
+        override fun create(): Channel<Int> = ArrayChannel(1)
+        override fun toString(): String = "ArrayChannel(1)"
+    },
+    ARRAY_10 {
+        override fun create(): Channel<Int> = ArrayChannel(8)
+        override fun toString(): String = "ArrayChannel(8)"
+    },
+    LINKED_LIST {
+        override fun create(): Channel<Int> = LinkedListChannel()
+        override fun toString(): String = "LinkedListChannel"
+    },
+    CONFLATED {
+        override fun create(): Channel<Int> = ConflatedChannel()
+        override fun toString(): String = "ConflatedChannel"
+        override val isConflated: Boolean get() = true
+    },
+    ARRAY_BROADCAST_1 {
+        override fun create(): Channel<Int> = ChannelViaBroadcast(ArrayBroadcastChannel<Int>(1))
+        override fun toString(): String = "ArrayBroadcastChannel(1)"
+    },
+    ARRAY_BROADCAST_10 {
+        override fun create(): Channel<Int> = ChannelViaBroadcast(ArrayBroadcastChannel<Int>(10))
+        override fun toString(): String = "ArrayBroadcastChannel(10)"
+    },
+    CONFLATED_BROADCAST {
+        override fun create(): Channel<Int> = ChannelViaBroadcast(ConflatedBroadcastChannel<Int>())
+        override fun toString(): String = "ConflatedBroadcastChannel"
+        override val isConflated: Boolean get() = true
+    }
+    ;
+
+    abstract fun create(): Channel<Int>
+    open val isConflated: Boolean get() = false
+}
+
+private class ChannelViaBroadcast<E>(
+    private val broadcast: BroadcastChannel<E>
+): Channel<E>, SendChannel<E> by broadcast {
+    val sub = broadcast.openSubscription()
+
+    override val isClosedForReceive: Boolean get() = sub.isClosedForReceive
+    override val isEmpty: Boolean get() = sub.isEmpty
+
+    // Workaround for KT-23094
+    override suspend fun send(element: E) = broadcast.send(element)
+
+    override suspend fun receive(): E = sub.receive()
+    override suspend fun receiveOrNull(): E? = sub.receiveOrNull()
+    override fun poll(): E? = sub.poll()
+    override fun iterator(): ChannelIterator<E> = sub.iterator()
+    override fun cancel(cause: Throwable?): Boolean = sub.cancel(cause)
+    override val onReceive: SelectClause1<E>
+        get() = sub.onReceive
+    override val onReceiveOrNull: SelectClause1<E?>
+        get() = sub.onReceiveOrNull
+}