Migrate channels and related operators to common, so channels can be used from JS
Fixes #201
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt
new file mode 100644
index 0000000..84da40f
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt
@@ -0,0 +1,179 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class ArrayBroadcastChannelTest : TestBase() {
+
+ @Test
+ fun testBasic() = runTest {
+ expect(1)
+ val broadcast = ArrayBroadcastChannel<Int>(1)
+ assertFalse(broadcast.isClosedForSend)
+ val first = broadcast.openSubscription()
+ launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+ expect(2)
+ assertEquals(1, first.receive()) // suspends
+ assertFalse(first.isClosedForReceive)
+ expect(5)
+ assertEquals(2, first.receive()) // suspends
+ assertFalse(first.isClosedForReceive)
+ expect(10)
+ assertNull(first.receiveOrNull()) // suspends
+ assertTrue(first.isClosedForReceive)
+ expect(14)
+ }
+ expect(3)
+ broadcast.send(1)
+ expect(4)
+ yield() // to the first receiver
+ expect(6)
+
+ val second = broadcast.openSubscription()
+ launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+ expect(7)
+ assertEquals(2, second.receive()) // suspends
+ assertFalse(second.isClosedForReceive)
+ expect(11)
+ assertNull(second.receiveOrNull()) // suspends
+ assertTrue(second.isClosedForReceive)
+ expect(15)
+ }
+ expect(8)
+ broadcast.send(2)
+ expect(9)
+ yield() // to first & second receivers
+ expect(12)
+ broadcast.close()
+ expect(13)
+ assertTrue(broadcast.isClosedForSend)
+ yield() // to first & second receivers
+ finish(16)
+ }
+
+ @Test
+ fun testSendSuspend() = runTest {
+ expect(1)
+ val broadcast = ArrayBroadcastChannel<Int>(1)
+ val first = broadcast.openSubscription()
+ launch(coroutineContext) {
+ expect(4)
+ assertEquals(1, first.receive())
+ expect(5)
+ assertEquals(2, first.receive())
+ expect(6)
+ }
+ expect(2)
+ broadcast.send(1) // puts to buffer, receiver not running yet
+ expect(3)
+ broadcast.send(2) // suspends
+ finish(7)
+ }
+
+ @Test
+ fun testConcurrentSendCompletion() = runTest {
+ expect(1)
+ val broadcast = ArrayBroadcastChannel<Int>(1)
+ val sub = broadcast.openSubscription()
+ // launch 3 concurrent senders (one goes buffer, two other suspend)
+ for (x in 1..3) {
+ launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+ expect(x + 1)
+ broadcast.send(x)
+ }
+ }
+ // and close it for send
+ expect(5)
+ broadcast.close()
+ // now must receive all 3 items
+ expect(6)
+ assertFalse(sub.isClosedForReceive)
+ for (x in 1..3)
+ assertEquals(x, sub.receiveOrNull())
+ // and receive close signal
+ assertNull(sub.receiveOrNull())
+ assertTrue(sub.isClosedForReceive)
+ finish(7)
+ }
+
+ @Test
+ fun testForgetUnsubscribed() = runTest {
+ expect(1)
+ val broadcast = ArrayBroadcastChannel<Int>(1)
+ broadcast.send(1)
+ broadcast.send(2)
+ broadcast.send(3)
+ expect(2) // should not suspend anywhere above
+ val sub = broadcast.openSubscription()
+ launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+ expect(3)
+ assertEquals(4, sub.receive()) // suspends
+ expect(5)
+ }
+ expect(4)
+ broadcast.send(4) // sends
+ yield()
+ finish(6)
+ }
+
+ @Test
+ fun testReceiveFullAfterClose() = runTest {
+ val channel = BroadcastChannel<Int>(10)
+ val sub = channel.openSubscription()
+ // generate into buffer & close
+ for (x in 1..5) channel.send(x)
+ channel.close()
+ // make sure all of them are consumed
+ check(!sub.isClosedForReceive)
+ for (x in 1..5) check(sub.receive() == x)
+ check(sub.receiveOrNull() == null)
+ check(sub.isClosedForReceive)
+ }
+
+ @Test
+ fun testCloseSubDuringIteration() = runTest {
+ val channel = BroadcastChannel<Int>(1)
+ // launch generator (for later) in this context
+ launch(coroutineContext) {
+ for (x in 1..5) channel.send(x)
+ channel.close()
+ }
+ // start consuming
+ val sub = channel.openSubscription()
+ var expected = 0
+ sub.consumeEach {
+ check(it == ++expected)
+ if (it == 2) {
+ sub.close()
+ }
+ }
+ check(expected == 2)
+ }
+
+ @Test
+ fun testReceiveFromClosedSub() = runTest({ it is ClosedReceiveChannelException }) {
+ val channel = BroadcastChannel<Int>(1)
+ val sub = channel.openSubscription()
+ assertFalse(sub.isClosedForReceive)
+ sub.close()
+ assertTrue(sub.isClosedForReceive)
+ sub.receive()
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt
new file mode 100644
index 0000000..61fdaef
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class ArrayChannelTest : TestBase() {
+
+ @Test
+ fun testSimple() = runTest {
+ val q = ArrayChannel<Int>(1)
+ check(q.isEmpty && !q.isFull)
+ expect(1)
+ val sender = launch(coroutineContext) {
+ expect(4)
+ q.send(1) // success -- buffered
+ check(!q.isEmpty && q.isFull)
+ expect(5)
+ q.send(2) // suspends (buffer full)
+ expect(9)
+ }
+ expect(2)
+ val receiver = launch(coroutineContext) {
+ expect(6)
+ check(q.receive() == 1) // does not suspend -- took from buffer
+ check(!q.isEmpty && q.isFull) // waiting sender's element moved to buffer
+ expect(7)
+ check(q.receive() == 2) // does not suspend (takes from sender)
+ expect(8)
+ }
+ expect(3)
+ sender.join()
+ receiver.join()
+ check(q.isEmpty && !q.isFull)
+ finish(10)
+ }
+
+ @Test
+ fun testClosedBufferedReceiveOrNull() = runTest {
+ val q = ArrayChannel<Int>(1)
+ check(q.isEmpty && !q.isFull && !q.isClosedForSend && !q.isClosedForReceive)
+ expect(1)
+ launch(coroutineContext) {
+ expect(5)
+ check(!q.isEmpty && !q.isFull && q.isClosedForSend && !q.isClosedForReceive)
+ assertEquals(42, q.receiveOrNull())
+ expect(6)
+ check(!q.isEmpty && !q.isFull && q.isClosedForSend && q.isClosedForReceive)
+ assertEquals(null, q.receiveOrNull())
+ expect(7)
+ }
+ expect(2)
+ q.send(42) // buffers
+ expect(3)
+ q.close() // goes on
+ expect(4)
+ check(!q.isEmpty && !q.isFull && q.isClosedForSend && !q.isClosedForReceive)
+ yield()
+ check(!q.isEmpty && !q.isFull && q.isClosedForSend && q.isClosedForReceive)
+ finish(8)
+ }
+
+ @Test
+ fun testClosedExceptions() = runTest {
+ val q = ArrayChannel<Int>(1)
+ expect(1)
+ launch(coroutineContext) {
+ expect(4)
+ try { q.receive() }
+ catch (e: ClosedReceiveChannelException) {
+ expect(5)
+ }
+ }
+ expect(2)
+
+ require(q.close())
+ expect(3)
+ yield()
+ expect(6)
+ try { q.send(42) }
+ catch (e: ClosedSendChannelException) {
+ finish(7)
+ }
+ }
+
+ @Test
+ fun testOfferAndPool() = runTest {
+ val q = ArrayChannel<Int>(1)
+ assertTrue(q.offer(1))
+ expect(1)
+ launch(coroutineContext) {
+ expect(3)
+ assertEquals(1, q.poll())
+ expect(4)
+ assertEquals(null, q.poll())
+ expect(5)
+ assertEquals(2, q.receive()) // suspends
+ expect(9)
+ assertEquals(3, q.poll())
+ expect(10)
+ assertEquals(null, q.poll())
+ expect(11)
+ }
+ expect(2)
+ yield()
+ expect(6)
+ assertTrue(q.offer(2))
+ expect(7)
+ assertTrue(q.offer(3))
+ expect(8)
+ assertFalse(q.offer(4))
+ yield()
+ finish(12)
+ }
+
+ @Test
+ fun testConsumeAll() = runTest {
+ val q = ArrayChannel<Int>(5)
+ for (i in 1..10) {
+ if (i <= 5) {
+ expect(i)
+ q.send(i) // shall buffer
+ } else {
+ launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+ expect(i)
+ q.send(i) // suspends
+ expectUnreached() // will get cancelled by cancel
+ }
+ }
+ }
+ expect(11)
+ q.cancel()
+ check(q.isClosedForSend)
+ check(q.isClosedForReceive)
+ check(q.receiveOrNull() == null)
+ finish(12)
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelFactoryTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelFactoryTest.kt
new file mode 100644
index 0000000..2bbc4a1
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelFactoryTest.kt
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlin.test.*
+
+
+class BroadcastChannelFactoryTest {
+
+ @Test
+ fun testRendezvousChannelNotSupported() {
+ assertFailsWith<IllegalArgumentException> { BroadcastChannel<Int>(0) }
+ }
+
+ @Test
+ fun testLinkedListChannelNotSupported() {
+ assertFailsWith<IllegalArgumentException> { BroadcastChannel<Int>(Channel.UNLIMITED) }
+ }
+
+ @Test
+ fun testConflatedBroadcastChannel() {
+ assertTrue { BroadcastChannel<Int>(Channel.CONFLATED) is ConflatedBroadcastChannel }
+ }
+
+ @Test
+ fun testArrayBroadcastChannel() {
+ assertTrue { BroadcastChannel<Int>(1) is ArrayBroadcastChannel }
+ assertTrue { BroadcastChannel<Int>(10) is ArrayBroadcastChannel }
+ }
+
+ @Test
+ fun testInvalidCapacityNotSupported() {
+ assertFailsWith<IllegalArgumentException> { BroadcastChannel<Int>(-2) }
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelFactoryTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelFactoryTest.kt
new file mode 100644
index 0000000..d4c5126
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelFactoryTest.kt
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.test.*
+
+
+class ChannelFactoryTest : TestBase() {
+
+ @Test
+ fun testRendezvousChannel() {
+ assertTrue(Channel<Int>() is RendezvousChannel)
+ assertTrue(Channel<Int>(0) is RendezvousChannel)
+ }
+
+ @Test
+ fun testLinkedListChannel() {
+ assertTrue(Channel<Int>(Channel.UNLIMITED) is LinkedListChannel)
+ }
+
+ @Test
+ fun testConflatedChannel() {
+ assertTrue(Channel<Int>(Channel.CONFLATED) is ConflatedChannel)
+ }
+
+ @Test
+ fun testArrayChannel() {
+ assertTrue(Channel<Int>(1) is ArrayChannel)
+ assertTrue(Channel<Int>(10) is ArrayChannel)
+ }
+
+ @Test
+ fun testInvalidCapacityNotSupported() = runTest({ it is IllegalArgumentException }) {
+ Channel<Int>(-2)
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt
new file mode 100644
index 0000000..0fe1fc9
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt
@@ -0,0 +1,569 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.math.*
+import kotlin.test.*
+
+class ChannelsTest: TestBase() {
+ private val testList = listOf(1, 2, 3)
+
+ @Test
+ fun testIterableAsReceiveChannel() = runTest {
+ assertEquals(testList, testList.asReceiveChannel().toList())
+ }
+
+ @Test
+ fun testSequenceAsReceiveChannel() = runTest {
+ assertEquals(testList, testList.asSequence().asReceiveChannel().toList())
+ }
+
+ @Test
+ fun testAssociate() = runTest {
+ assertEquals(testList.associate { it * 2 to it * 3 },
+ testList.asReceiveChannel().associate { it * 2 to it * 3 }.toMap())
+ }
+
+ @Test
+ fun testAssociateBy() = runTest {
+ assertEquals(testList.associateBy { it % 2 }, testList.asReceiveChannel().associateBy { it % 2 })
+ }
+
+ @Test
+ fun testAssociateBy2() = runTest {
+ assertEquals(testList.associateBy({ it * 2}, { it * 3 }),
+ testList.asReceiveChannel().associateBy({ it * 2}, { it * 3 }).toMap())
+ }
+
+ @Test
+ fun testDistinct() = runTest {
+ assertEquals(testList.map { it % 2 }.distinct(), testList.asReceiveChannel().map { it % 2 }.distinct().toList())
+ }
+
+ @Test
+ fun testDistinctBy() = runTest {
+ assertEquals(testList.distinctBy { it % 2 }.toList(), testList.asReceiveChannel().distinctBy { it % 2 }.toList())
+ }
+
+ @Test
+ fun testToCollection() = runTest {
+ val target = mutableListOf<Int>()
+ testList.asReceiveChannel().toCollection(target)
+ assertEquals(testList, target)
+ }
+
+ @Test
+ fun testDrop() = runTest {
+ for (i in 0..testList.size) {
+ assertEquals(testList.drop(i), testList.asReceiveChannel().drop(i).toList(), "Drop $i")
+ }
+ }
+
+ @Test
+ fun testElementAtOrElse() = runTest {
+ assertEquals(testList.elementAtOrElse(2) { 42 }, testList.asReceiveChannel().elementAtOrElse(2) { 42 })
+ assertEquals(testList.elementAtOrElse(9) { 42 }, testList.asReceiveChannel().elementAtOrElse(9) { 42 })
+ }
+
+ @Test
+ fun testFirst() = runTest {
+ assertEquals(testList.first(), testList.asReceiveChannel().first())
+ for (i in testList) {
+ assertEquals(testList.first { it == i }, testList.asReceiveChannel().first { it == i })
+ }
+ try {
+ testList.asReceiveChannel().first { it == 9 }
+ fail()
+ } catch (nse: NoSuchElementException) {
+ }
+ }
+
+ @Test
+ fun testFirstOrNull() = runTest {
+ assertEquals(testList.firstOrNull(), testList.asReceiveChannel().firstOrNull())
+ assertEquals(testList.firstOrNull { it == 2 }, testList.asReceiveChannel().firstOrNull { it == 2 })
+ assertEquals(testList.firstOrNull { it == 9 }, testList.asReceiveChannel().firstOrNull { it == 9 })
+ }
+
+ @Test
+ fun testFlatMap() = runTest {
+ assertEquals(testList.flatMap { (0..it).toList() }, testList.asReceiveChannel().flatMap { (0..it).asReceiveChannel() }.toList())
+
+ }
+
+ @Test
+ fun testFold() = runTest {
+ assertEquals(testList.fold(mutableListOf(42)) { acc, e -> acc.apply { add(e) } },
+ testList.asReceiveChannel().fold(mutableListOf(42)) { acc, e -> acc.apply { add(e) } }.toList())
+ }
+
+ @Test
+ fun testFoldIndexed() = runTest {
+ assertEquals(testList.foldIndexed(mutableListOf(42)) { index, acc, e -> acc.apply { add(index + e) } },
+ testList.asReceiveChannel().foldIndexed(mutableListOf(42)) { index, acc, e -> acc.apply { add(index + e) } }.toList())
+ }
+
+ @Test
+ fun testGroupBy() = runTest {
+ assertEquals(testList.groupBy { it % 2 }, testList.asReceiveChannel().groupBy { it % 2 })
+ }
+
+ @Test
+ fun testGroupBy2() = runTest {
+ assertEquals(testList.groupBy({ -it }, { it + 100 }), testList.asReceiveChannel().groupBy({ -it }, { it + 100 }).toMap())
+
+ }
+
+ @Test
+ fun testMap() = runTest {
+ assertEquals(testList.map { it + 10 }, testList.asReceiveChannel().map { it + 10 }.toList())
+
+ }
+
+ @Test
+ fun testMapToCollection() = runTest {
+ val c = mutableListOf<Int>()
+ testList.asReceiveChannel().mapTo(c) { it + 10 }
+ assertEquals(testList.map { it + 10 }, c)
+ }
+
+ @Test
+ fun testMapToSendChannel() = runTest {
+ val c = produce<Int> {
+ testList.asReceiveChannel().mapTo(channel) { it + 10 }
+ }
+ assertEquals(testList.map { it + 10 }, c.toList())
+ }
+
+ @Test
+ fun testEmptyList() = runTest {
+ assertTrue(emptyList<Nothing>().asReceiveChannel().toList().isEmpty())
+ }
+
+ @Test
+ fun testToList() = runTest {
+ assertEquals(testList, testList.asReceiveChannel().toList())
+
+ }
+
+ @Test
+ fun testEmptySet() = runTest {
+ assertTrue(emptyList<Nothing>().asReceiveChannel().toSet().isEmpty())
+
+ }
+
+ @Test
+ fun testToSet() = runTest {
+ assertEquals(testList.toSet(), testList.asReceiveChannel().toSet())
+ }
+
+ @Test
+ fun testToMutableSet() = runTest {
+ assertEquals(testList.toMutableSet(), testList.asReceiveChannel().toMutableSet())
+ }
+
+ @Test
+ fun testEmptySequence() = runTest {
+ val channel = Channel<Nothing>()
+ channel.close()
+
+ assertTrue(emptyList<Nothing>().asReceiveChannel().count() == 0)
+ }
+
+ @Test
+ fun testEmptyMap() = runTest {
+ val channel = Channel<Pair<Nothing, Nothing>>()
+ channel.close()
+
+ assertTrue(channel.toMap().isEmpty())
+ }
+
+ @Test
+ fun testToMap() = runTest {
+ val values = testList.map { it to it.toString() }
+ assertEquals(values.toMap(), values.asReceiveChannel().toMap())
+ }
+
+ @Test
+ fun testReduce() = runTest {
+ assertEquals(testList.reduce { acc, e -> acc * e },
+ testList.asReceiveChannel().reduce { acc, e -> acc * e })
+ }
+
+ @Test
+ fun testReduceIndexed() = runTest {
+ assertEquals(testList.reduceIndexed { index, acc, e -> index + acc * e },
+ testList.asReceiveChannel().reduceIndexed { index, acc, e -> index + acc * e })
+ }
+
+ @Test
+ fun testTake() = runTest {
+ for (i in 0..testList.size) {
+ assertEquals(testList.take(i), testList.asReceiveChannel().take(i).toList())
+ }
+ }
+
+ @Test
+ fun testPartition() = runTest {
+ assertEquals(testList.partition { it % 2 == 0 }, testList.asReceiveChannel().partition { it % 2 == 0 })
+ }
+
+ @Test
+ fun testZip() = runTest {
+ val other = listOf("a", "b")
+ assertEquals(testList.zip(other), testList.asReceiveChannel().zip(other.asReceiveChannel()).toList())
+ }
+
+ @Test
+ fun testElementAt() = runTest {
+ testList.indices.forEach { i ->
+ assertEquals(testList[i], testList.asReceiveChannel().elementAt(i))
+ }
+ }
+
+ @Test
+ fun testElementAtOrNull() = runTest {
+ testList.indices.forEach { i ->
+ assertEquals(testList[i], testList.asReceiveChannel().elementAtOrNull(i))
+ }
+ assertEquals(null, testList.asReceiveChannel().elementAtOrNull(-1))
+ assertEquals(null, testList.asReceiveChannel().elementAtOrNull(testList.size))
+ }
+
+ @Test
+ fun testFind() = runTest {
+ repeat(3) { mod ->
+ assertEquals(testList.find { it % 2 == mod },
+ testList.asReceiveChannel().find { it % 2 == mod })
+ }
+ }
+
+ @Test
+ fun testFindLast() = runTest {
+ repeat(3) { mod ->
+ assertEquals(testList.findLast { it % 2 == mod }, testList.asReceiveChannel().findLast { it % 2 == mod })
+ }
+ }
+
+ @Test
+ fun testIndexOf() = runTest {
+ repeat(testList.size + 1) { i ->
+ assertEquals(testList.indexOf(i), testList.asReceiveChannel().indexOf(i))
+ }
+ }
+
+ @Test
+ fun testLastIndexOf() = runTest {
+ repeat(testList.size + 1) { i ->
+ assertEquals(testList.lastIndexOf(i), testList.asReceiveChannel().lastIndexOf(i))
+ }
+ }
+
+ @Test
+ fun testIndexOfFirst() = runTest {
+ repeat(3) { mod ->
+ assertEquals(testList.indexOfFirst { it % 2 == mod },
+ testList.asReceiveChannel().indexOfFirst { it % 2 == mod })
+ }
+ }
+
+ @Test
+ fun testIndexOfLast() = runTest {
+ repeat(3) { mod ->
+ assertEquals(testList.indexOfLast { it % 2 != mod },
+ testList.asReceiveChannel().indexOfLast { it % 2 != mod })
+ }
+ }
+
+ @Test
+ fun testLastOrNull() = runTest {
+ assertEquals(testList.lastOrNull(), testList.asReceiveChannel().lastOrNull())
+ assertEquals(null, emptyList<Int>().asReceiveChannel().lastOrNull())
+ }
+
+ @Test
+ fun testSingleOrNull() = runTest {
+ assertEquals(1, listOf(1).asReceiveChannel().singleOrNull())
+ assertEquals(null, listOf(1, 2).asReceiveChannel().singleOrNull())
+ assertEquals(null, emptyList<Int>().asReceiveChannel().singleOrNull())
+ repeat(testList.size + 1) { i ->
+ assertEquals(testList.singleOrNull { it == i },
+ testList.asReceiveChannel().singleOrNull { it == i })
+ }
+ repeat(3) { mod ->
+ assertEquals(testList.singleOrNull { it % 2 == mod },
+ testList.asReceiveChannel().singleOrNull { it % 2 == mod })
+ }
+ }
+
+ @Test
+ fun testDropWhile() = runTest {
+ repeat(3) { mod ->
+ assertEquals(testList.dropWhile { it % 2 == mod },
+ testList.asReceiveChannel().dropWhile { it % 2 == mod }.toList())
+ }
+ }
+
+ @Test
+ fun testFilter() = runTest {
+ repeat(3) { mod ->
+ assertEquals(testList.filter { it % 2 == mod },
+ testList.asReceiveChannel().filter { it % 2 == mod }.toList())
+ }
+ }
+
+ @Test
+ fun testFilterToCollection() = runTest {
+ repeat(3) { mod ->
+ val c = mutableListOf<Int>()
+ testList.asReceiveChannel().filterTo(c) { it % 2 == mod }
+ assertEquals(testList.filter { it % 2 == mod }, c)
+ }
+ }
+
+ @Test
+ fun testFilterToSendChannel() = runTest {
+ repeat(3) { mod ->
+ val c = produce<Int> {
+ testList.asReceiveChannel().filterTo(channel) { it % 2 == mod }
+ }
+ assertEquals(testList.filter { it % 2 == mod }, c.toList())
+ }
+ }
+
+ @Test
+ fun testFilterNot() = runTest {
+ repeat(3) { mod ->
+ assertEquals(testList.filterNot { it % 2 == mod },
+ testList.asReceiveChannel().filterNot { it % 2 == mod }.toList())
+ }
+ }
+
+ @Test
+ fun testFilterNotToCollection() = runTest {
+ repeat(3) { mod ->
+ val c = mutableListOf<Int>()
+ testList.asReceiveChannel().filterNotTo(c) { it % 2 == mod }
+ assertEquals(testList.filterNot { it % 2 == mod }, c)
+ }
+ }
+
+ @Test
+ fun testFilterNotToSendChannel() = runTest {
+ repeat(3) { mod ->
+ val c = produce<Int> {
+ testList.asReceiveChannel().filterNotTo(channel) { it % 2 == mod }
+ }
+ assertEquals(testList.filterNot { it % 2 == mod }, c.toList())
+ }
+ }
+
+ @Test
+ fun testFilterNotNull() = runTest {
+ repeat(3) { mod ->
+ assertEquals(testList.map { it.takeIf { it % 2 == mod } }.filterNotNull(),
+ testList.asReceiveChannel().map { it.takeIf { it % 2 == mod } }.filterNotNull().toList())
+ }
+ }
+
+ @Test
+ fun testFilterNotNullToCollection() = runTest {
+ repeat(3) { mod ->
+ val c = mutableListOf<Int>()
+ testList.asReceiveChannel().map { it.takeIf { it % 2 == mod } }.filterNotNullTo(c)
+ assertEquals(testList.map { it.takeIf { it % 2 == mod } }.filterNotNull(), c)
+ }
+ }
+
+ @Test
+ fun testFilterNotNullToSendChannel() = runTest {
+ repeat(3) { mod ->
+ val c = produce<Int> {
+ testList.asReceiveChannel().map { it.takeIf { it % 2 == mod } }.filterNotNullTo(channel)
+ }
+ assertEquals(testList.map { it.takeIf { it % 2 == mod } }.filterNotNull(), c.toList())
+ }
+ }
+
+ @Test
+ fun testFilterIndexed() = runTest {
+ repeat(3) { mod ->
+ assertEquals(testList.filterIndexed { index, _ -> index % 2 == mod },
+ testList.asReceiveChannel().filterIndexed { index, _ -> index % 2 == mod }.toList())
+ }
+ }
+
+ @Test
+ fun testFilterIndexedToCollection() = runTest {
+ repeat(3) { mod ->
+ val c = mutableListOf<Int>()
+ testList.asReceiveChannel().filterIndexedTo(c) { index, _ -> index % 2 == mod }
+ assertEquals(testList.filterIndexed { index, _ -> index % 2 == mod }, c)
+ }
+ }
+
+ @Test
+ fun testFilterIndexedToChannel() = runTest {
+ repeat(3) { mod ->
+ val c = produce<Int> {
+ testList.asReceiveChannel().filterIndexedTo(channel) { index, _ -> index % 2 == mod }
+ }
+ assertEquals(testList.filterIndexed { index, _ -> index % 2 == mod }, c.toList())
+ }
+ }
+
+ @Test
+ fun testTakeWhile() = runTest {
+ repeat(3) { mod ->
+ assertEquals(testList.takeWhile { it % 2 != mod },
+ testList.asReceiveChannel().takeWhile { it % 2 != mod }.toList())
+ }
+ }
+
+ @Test
+ fun testToChannel() = runTest {
+ val c = produce<Int> {
+ testList.asReceiveChannel().toChannel(channel)
+ }
+ assertEquals(testList, c.toList())
+ }
+
+ @Test
+ fun testMapIndexed() = runTest {
+ assertEquals(testList.mapIndexed { index, i -> index + i },
+ testList.asReceiveChannel().mapIndexed { index, i -> index + i }.toList())
+ }
+
+ @Test
+ fun testMapIndexedToCollection() = runTest {
+ val c = mutableListOf<Int>()
+ testList.asReceiveChannel().mapIndexedTo(c) { index, i -> index + i }
+ assertEquals(testList.mapIndexed { index, i -> index + i }, c)
+ }
+
+ @Test
+ fun testMapIndexedToSendChannel() = runTest {
+ val c = produce<Int> {
+ testList.asReceiveChannel().mapIndexedTo(channel) { index, i -> index + i }
+ }
+ assertEquals(testList.mapIndexed { index, i -> index + i }, c.toList())
+ }
+
+ @Test
+ fun testMapNotNull() = runTest {
+ repeat(3) { mod ->
+ assertEquals(testList.mapNotNull { i -> i.takeIf { i % 2 == mod } },
+ testList.asReceiveChannel().mapNotNull { i -> i.takeIf { i % 2 == mod } }.toList())
+ }
+ }
+
+ @Test
+ fun testMapNotNullToCollection() = runTest {
+ repeat(3) { mod ->
+ val c = mutableListOf<Int>()
+ testList.asReceiveChannel().mapNotNullTo(c) { i -> i.takeIf { i % 2 == mod } }
+ assertEquals(testList.mapNotNull { i -> i.takeIf { i % 2 == mod } }, c)
+ }
+ }
+
+ @Test
+ fun testMapNotNullToSendChannel() = runTest {
+ repeat(3) { mod ->
+ val c = produce<Int> {
+ testList.asReceiveChannel().mapNotNullTo(channel) { i -> i.takeIf { i % 2 == mod } }
+ }
+ assertEquals(testList.mapNotNull { i -> i.takeIf { i % 2 == mod } }, c.toList())
+ }
+ }
+
+ @Test
+ fun testMapIndexedNotNull() = runTest {
+ repeat(3) { mod ->
+ assertEquals(testList.mapIndexedNotNull { index, i -> index.takeIf { i % 2 == mod } },
+ testList.asReceiveChannel().mapIndexedNotNull { index, i -> index.takeIf { i % 2 == mod } }.toList())
+ }
+ }
+
+ @Test
+ fun testMapIndexedNotNullToCollection() = runTest {
+ repeat(3) { mod ->
+ val c = mutableListOf<Int>()
+ testList.asReceiveChannel().mapIndexedNotNullTo(c) { index, i -> index.takeIf { i % 2 == mod } }
+ assertEquals(testList.mapIndexedNotNull { index, i -> index.takeIf { i % 2 == mod } }, c)
+ }
+ }
+
+ @Test
+ fun testMapIndexedNotNullToSendChannel() = runTest {
+ repeat(3) { mod ->
+ val c = produce<Int> {
+ testList.asReceiveChannel().mapIndexedNotNullTo(channel) { index, i -> index.takeIf { i % 2 == mod } }
+ }
+ assertEquals(testList.mapIndexedNotNull { index, i -> index.takeIf { i % 2 == mod } }, c.toList())
+ }
+ }
+
+ @Test
+ fun testWithIndex() = runTest {
+ assertEquals(testList.withIndex().toList(), testList.asReceiveChannel().withIndex().toList())
+ }
+
+ @Test
+ fun testMaxBy() = runTest {
+ assertEquals(testList.maxBy { 10 - abs(it - 2) },
+ testList.asReceiveChannel().maxBy { 10 - abs(it - 2) })
+ }
+
+ @Test
+ fun testMaxWith() = runTest {
+ val cmp = compareBy<Int> { 10 - abs(it - 2) }
+ assertEquals(testList.maxWith(cmp),
+ testList.asReceiveChannel().maxWith(cmp))
+ }
+
+ @Test
+ fun testMinBy() = runTest {
+ assertEquals(testList.minBy { abs(it - 2) },
+ testList.asReceiveChannel().minBy { abs(it - 2) })
+ }
+
+ @Test
+ fun testMinWith() = runTest {
+ val cmp = compareBy<Int> { abs(it - 2) }
+ assertEquals(testList.minWith(cmp),
+ testList.asReceiveChannel().minWith(cmp))
+ }
+
+ @Test
+ fun testSumBy() = runTest {
+ assertEquals(testList.sumBy { it * 3 },
+ testList.asReceiveChannel().sumBy { it * 3 })
+ }
+
+ @Test
+ fun testSumByDouble() = runTest {
+ val expected = testList.sumByDouble { it * 3.0 }
+ val actual = testList.asReceiveChannel().sumByDouble { it * 3.0 }
+ assertEquals(expected, actual)
+ }
+
+ @Test
+ fun testRequireNoNulls() = runTest {
+ assertEquals(testList.requireNoNulls(), testList.asReceiveChannel().requireNoNulls().toList())
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt
new file mode 100644
index 0000000..4c04f8f
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class ConflatedBroadcastChannelTest : TestBase() {
+
+ @Test
+ fun testBasicScenario() = runTest {
+ expect(1)
+ val broadcast = ConflatedBroadcastChannel<String>()
+ assertTrue(exceptionFromNotInline { broadcast.value } is IllegalStateException)
+ assertNull(broadcast.valueOrNull)
+
+ launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+ expect(2)
+ val sub = broadcast.openSubscription()
+ assertNull(sub.poll())
+ expect(3)
+ assertEquals("one", sub.receive()) // suspends
+ expect(6)
+ assertEquals("two", sub.receive()) // suspends
+ expect(12)
+ sub.close()
+ expect(13)
+ }
+
+ expect(4)
+ broadcast.send("one") // does not suspend
+ assertEquals("one", broadcast.value)
+ assertEquals("one", broadcast.valueOrNull)
+ expect(5)
+ yield() // to receiver
+ expect(7)
+ launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+ expect(8)
+ val sub = broadcast.openSubscription()
+ assertEquals("one", sub.receive()) // does not suspend
+ expect(9)
+ assertEquals("two", sub.receive()) // suspends
+ expect(14)
+ assertEquals("three", sub.receive()) // suspends
+ expect(17)
+ assertNull(sub.receiveOrNull()) // suspends until closed
+ expect(20)
+ sub.close()
+ expect(21)
+ }
+
+ expect(10)
+ broadcast.send("two") // does not suspend
+ assertEquals("two", broadcast.value)
+ assertEquals("two", broadcast.valueOrNull)
+ expect(11)
+ yield() // to both receivers
+ expect(15)
+ broadcast.send("three") // does not suspend
+ assertEquals("three", broadcast.value)
+ assertEquals("three", broadcast.valueOrNull)
+ expect(16)
+ yield() // to second receiver
+ expect(18)
+ broadcast.close()
+ assertTrue(exceptionFromNotInline { broadcast.value } is IllegalStateException)
+ assertNull(broadcast.valueOrNull)
+ expect(19)
+ yield() // to second receiver
+ assertTrue(exceptionFrom { broadcast.send("four") } is ClosedSendChannelException)
+ finish(22)
+ }
+
+ @Test
+ fun testInitialValueAndReceiveClosed() = runTest {
+ expect(1)
+ val broadcast = ConflatedBroadcastChannel(1)
+ assertEquals(1, broadcast.value)
+ assertEquals(1, broadcast.valueOrNull)
+ launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+ expect(2)
+ val sub = broadcast.openSubscription()
+ assertEquals(1, sub.receive())
+ expect(3)
+ assertTrue(exceptionFrom { sub.receive() } is ClosedReceiveChannelException) // suspends
+ expect(6)
+ }
+ expect(4)
+ broadcast.close()
+ expect(5)
+ yield() // to child
+ finish(7)
+ }
+
+ inline fun exceptionFrom(block: () -> Unit): Throwable? {
+ try {
+ block()
+ return null
+ } catch (e: Throwable) {
+ return e
+ }
+ }
+
+ // Ugly workaround for bug in JS compiler
+ fun exceptionFromNotInline(block: () -> Unit): Throwable? {
+ try {
+ block()
+ return null
+ } catch (e: Throwable) {
+ return e
+ }
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt
new file mode 100644
index 0000000..1fd7413
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class ConflatedChannelTest : TestBase() {
+
+ @Test
+ fun testBasicConflationOfferPoll() {
+ val q = ConflatedChannel<Int>()
+ assertNull(q.poll())
+ assertTrue(q.offer(1))
+ assertTrue(q.offer(2))
+ assertTrue(q.offer(3))
+ assertEquals(3, q.poll())
+ assertNull(q.poll())
+ }
+
+ @Test
+ fun testConflatedSend() = runTest {
+ val q = ConflatedChannel<Int>()
+ q.send(1)
+ q.send(2) // shall conflated previously sent
+ assertEquals(2, q.receiveOrNull())
+ }
+
+ @Test
+ fun testConflatedClose() = runTest {
+ val q = ConflatedChannel<Int>()
+ q.send(1)
+ q.close() // shall conflate sent item and become closed
+ assertNull(q.receiveOrNull())
+ }
+
+ @Test
+ fun testConflationSendReceive() = runTest {
+ val q = ConflatedChannel<Int>()
+ expect(1)
+ launch(coroutineContext) { // receiver coroutine
+ expect(4)
+ assertEquals(2, q.receive())
+ expect(5)
+ assertEquals(3, q.receive()) // this receive suspends
+ expect(8)
+ assertEquals(6, q.receive()) // last conflated value
+ expect(9)
+ }
+ expect(2)
+ q.send(1)
+ q.send(2) // shall conflate
+ expect(3)
+ yield() // to receiver
+ expect(6)
+ q.send(3) // send to the waiting receiver
+ q.send(4) // buffer
+ q.send(5) // conflate
+ q.send(6) // conflate again
+ expect(7)
+ yield() // to receiver
+ finish(10)
+ }
+
+ @Test
+ fun testConsumeAll() = runTest {
+ val q = ConflatedChannel<Int>()
+ expect(1)
+ for (i in 1..10) {
+ q.send(i) // stores as last
+ }
+ q.cancel()
+ check(q.isClosedForSend)
+ check(q.isClosedForReceive)
+ check(q.receiveOrNull() == null)
+ finish(2)
+ }
+}
\ No newline at end of file
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt
new file mode 100644
index 0000000..897801e
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.TestBase
+import kotlin.test.*
+
+class LinkedListChannelTest : TestBase() {
+
+ @Test
+ fun testBasic() = runTest {
+ val c = LinkedListChannel<Int>()
+ c.send(1)
+ check(c.offer(2))
+ c.send(3)
+ check(c.close())
+ check(!c.close())
+ assertEquals(1, c.receive())
+ assertEquals(2, c.poll())
+ assertEquals(3, c.receiveOrNull())
+ assertNull(c.receiveOrNull())
+ }
+
+ @Test
+ fun testConsumeAll() = runTest {
+ val q = LinkedListChannel<Int>()
+ for (i in 1..10) {
+ q.send(i) // buffers
+ }
+ q.cancel()
+ check(q.isClosedForSend)
+ check(q.isClosedForReceive)
+ check(q.receiveOrNull() == null)
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeTest.kt
new file mode 100644
index 0000000..6646aa6
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeTest.kt
@@ -0,0 +1,55 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class ProduceConsumeTest : TestBase() {
+
+ @Test
+ fun testRendezvous() = runTest {
+ testProducer(1)
+ }
+
+ @Test
+ fun testSmallBuffer() = runTest {
+ testProducer(1)
+ }
+
+ @Test
+ fun testMediumBuffer() = runTest {
+ testProducer(10)
+ }
+
+ @Test
+ fun testLargeMediumBuffer() = runTest {
+ testProducer(1000)
+ }
+
+ @Test
+ fun testUnlimited() = runTest {
+ testProducer(Channel.UNLIMITED)
+ }
+
+ private suspend fun testProducer(producerCapacity: Int) {
+ testProducer(1, producerCapacity)
+ testProducer(10, producerCapacity)
+ testProducer(100, producerCapacity)
+ }
+
+ private suspend fun testProducer(messages: Int, producerCapacity: Int) {
+ var sentAll = false
+ val producer = produce(coroutineContext, capacity = producerCapacity) {
+ for (i in 1..messages) {
+ send(i)
+ }
+ sentAll = true
+ }
+ var consumed = 0
+ for (x in producer) {
+ consumed++
+ }
+ assertTrue(sentAll)
+ assertEquals(messages, consumed)
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt
new file mode 100644
index 0000000..522f6d6
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class ProduceTest : TestBase() {
+
+ @Test
+ fun testBasic() = runTest {
+ val c = produce(coroutineContext) {
+ expect(2)
+ send(1)
+ expect(3)
+ send(2)
+ expect(6)
+ }
+ expect(1)
+ check(c.receive() == 1)
+ expect(4)
+ check(c.receive() == 2)
+ expect(5)
+ check(c.receiveOrNull() == null)
+ finish(7)
+ }
+
+ @Test
+ fun testCancel() = runTest {
+ val c = produce(coroutineContext) {
+ expect(2)
+ send(1)
+ expect(3)
+ try {
+ send(2) // will get cancelled
+ } catch (e: Throwable) {
+ finish(7)
+ check(e is JobCancellationException && e.job == coroutineContext[Job])
+ throw e
+ }
+ expectUnreached()
+ }
+ expect(1)
+ check(c.receive() == 1)
+ expect(4)
+ c.cancel()
+ expect(5)
+ check(c.receiveOrNull() == null)
+ expect(6)
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
new file mode 100644
index 0000000..6e1b2c3
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
@@ -0,0 +1,292 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class RendezvousChannelTest : TestBase() {
+
+ @Test
+ fun testSimple() = runTest {
+ val q = RendezvousChannel<Int>()
+ check(q.isEmpty && q.isFull)
+ expect(1)
+ val sender = launch(coroutineContext) {
+ expect(4)
+ q.send(1) // suspend -- the first to come to rendezvous
+ expect(7)
+ q.send(2) // does not suspend -- receiver is there
+ expect(8)
+ }
+ expect(2)
+ val receiver = launch(coroutineContext) {
+ expect(5)
+ check(q.receive() == 1) // does not suspend -- sender was there
+ expect(6)
+ check(q.receive() == 2) // suspends
+ expect(9)
+ }
+ expect(3)
+ sender.join()
+ receiver.join()
+ check(q.isEmpty && q.isFull)
+ finish(10)
+ }
+
+ @Test
+ fun testClosedReceiveOrNull() = runTest {
+ val q = RendezvousChannel<Int>()
+ check(q.isEmpty && q.isFull && !q.isClosedForSend && !q.isClosedForReceive)
+ expect(1)
+ launch(coroutineContext) {
+ expect(3)
+ assertEquals(42, q.receiveOrNull())
+ expect(4)
+ assertEquals(null, q.receiveOrNull())
+ expect(6)
+ }
+ expect(2)
+ q.send(42)
+ expect(5)
+ q.close()
+ check(!q.isEmpty && !q.isFull && q.isClosedForSend && q.isClosedForReceive)
+ yield()
+ check(!q.isEmpty && !q.isFull && q.isClosedForSend && q.isClosedForReceive)
+ finish(7)
+ }
+
+ @Test
+ fun testClosedExceptions() = runTest {
+ val q = RendezvousChannel<Int>()
+ expect(1)
+ launch(coroutineContext) {
+ expect(4)
+ try { q.receive() }
+ catch (e: ClosedReceiveChannelException) {
+ expect(5)
+ }
+ }
+ expect(2)
+ q.close()
+ expect(3)
+ yield()
+ expect(6)
+ try { q.send(42) }
+ catch (e: ClosedSendChannelException) {
+ finish(7)
+ }
+ }
+
+ @Test
+ fun testOfferAndPool() = runTest {
+ val q = RendezvousChannel<Int>()
+ assertFalse(q.offer(1))
+ expect(1)
+ launch(coroutineContext) {
+ expect(3)
+ assertEquals(null, q.poll())
+ expect(4)
+ assertEquals(2, q.receive())
+ expect(7)
+ assertEquals(null, q.poll())
+ yield()
+ expect(9)
+ assertEquals(3, q.poll())
+ expect(10)
+ }
+ expect(2)
+ yield()
+ expect(5)
+ assertTrue(q.offer(2))
+ expect(6)
+ yield()
+ expect(8)
+ q.send(3)
+ finish(11)
+ }
+
+ @Test
+ fun testIteratorClosed() = runTest {
+ val q = RendezvousChannel<Int>()
+ expect(1)
+ launch(coroutineContext) {
+ expect(3)
+ q.close()
+ expect(4)
+ }
+ expect(2)
+ for (x in q) {
+ expectUnreached()
+ }
+ finish(5)
+ }
+
+ @Test
+ fun testIteratorOne() = runTest {
+ val q = RendezvousChannel<Int>()
+ expect(1)
+ launch(coroutineContext) {
+ expect(3)
+ q.send(1)
+ expect(4)
+ q.close()
+ expect(5)
+ }
+ expect(2)
+ for (x in q) {
+ expect(6)
+ assertEquals(1, x)
+ }
+ finish(7)
+ }
+
+ @Test
+ fun testIteratorOneWithYield() = runTest {
+ val q = RendezvousChannel<Int>()
+ expect(1)
+ launch(coroutineContext) {
+ expect(3)
+ q.send(1) // will suspend
+ expect(6)
+ q.close()
+ expect(7)
+ }
+ expect(2)
+ yield() // yield to sender coroutine right before starting for loop
+ expect(4)
+ for (x in q) {
+ expect(5)
+ assertEquals(1, x)
+ }
+ finish(8)
+ }
+
+ @Test
+ fun testIteratorTwo() = runTest {
+ val q = RendezvousChannel<Int>()
+ expect(1)
+ launch(coroutineContext) {
+ expect(3)
+ q.send(1)
+ expect(4)
+ q.send(2)
+ expect(7)
+ q.close()
+ expect(8)
+ }
+ expect(2)
+ for (x in q) {
+ when (x) {
+ 1 -> expect(5)
+ 2 -> expect(6)
+ else -> expectUnreached()
+ }
+ }
+ finish(9)
+ }
+
+ @Test
+ fun testIteratorTwoWithYield() = runTest {
+ val q = RendezvousChannel<Int>()
+ expect(1)
+ launch(coroutineContext) {
+ expect(3)
+ q.send(1) // will suspend
+ expect(6)
+ q.send(2)
+ expect(7)
+ q.close()
+ expect(8)
+ }
+ expect(2)
+ yield() // yield to sender coroutine right before starting for loop
+ expect(4)
+ for (x in q) {
+ when (x) {
+ 1 -> expect(5)
+ 2 -> expect(9)
+ else -> expectUnreached()
+ }
+ }
+ finish(10)
+ }
+
+ @Test
+ fun testSuspendSendOnClosedChannel() = runTest {
+ val q = RendezvousChannel<Int>()
+ expect(1)
+ launch(coroutineContext) {
+ expect(4)
+ q.send(42) // suspend
+ expect(11)
+ }
+ expect(2)
+ launch(coroutineContext) {
+ expect(5)
+ q.close()
+ expect(6)
+ }
+ expect(3)
+ yield() // to sender
+ expect(7)
+ yield() // try to resume sender (it will not resume despite the close!)
+ expect(8)
+ assertEquals(42, q.receiveOrNull())
+ expect(9)
+ assertNull(q.receiveOrNull())
+ expect(10)
+ yield() // to sender, it was resumed!
+ finish(12)
+ }
+
+ class BadClass {
+ override fun equals(other: Any?): Boolean = error("equals")
+ override fun hashCode(): Int = error("hashCode")
+ override fun toString(): String = error("toString")
+ }
+
+ @Test
+ fun testProduceBadClass() = runTest {
+ val bad = BadClass()
+ val c = produce(coroutineContext) {
+ expect(1)
+ send(bad)
+ }
+ assertTrue(c.receive() === bad)
+ finish(2)
+ }
+
+ @Test
+ fun testConsumeAll() = runTest {
+ val q = RendezvousChannel<Int>()
+ for (i in 1..10) {
+ launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+ expect(i)
+ q.send(i) // suspends
+ expectUnreached() // will get cancelled by cancel
+ }
+ }
+ expect(11)
+ q.cancel()
+ check(q.isClosedForSend)
+ check(q.isClosedForReceive)
+ check(q.receiveOrNull() == null)
+ finish(12)
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SendReceiveStressTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SendReceiveStressTest.kt
new file mode 100644
index 0000000..c85f541
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SendReceiveStressTest.kt
@@ -0,0 +1,46 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class SendReceiveStressTest : TestBase() {
+
+ // Emulate parametrized by hand :(
+
+ @Test
+ fun testArrayChannel() = runTest {
+ testStress(ArrayChannel(2))
+ }
+
+ @Test
+ fun testLinkedListChannel() = runTest {
+ testStress(LinkedListChannel())
+ }
+
+ @Test
+ fun testRendezvousChannel() = runTest {
+ testStress(RendezvousChannel())
+ }
+
+ private suspend fun testStress(channel: Channel<Int>) {
+ val n = 1_000 // Do not increase, otherwise node.js will fail with timeout :(
+ val sender = launch(coroutineContext) {
+ for (i in 1..n) {
+ channel.send(i)
+ }
+ expect(2)
+ }
+ val receiver = launch(coroutineContext) {
+ for (i in 1..n) {
+ val next = channel.receive()
+ check(next == i)
+ }
+ expect(3)
+ }
+ expect(1)
+ sender.join()
+ receiver.join()
+ finish(4)
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt
new file mode 100644
index 0000000..69e939f
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt
@@ -0,0 +1,35 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class SimpleSendReceiveTest : TestBase() {
+
+ @Test
+ fun testSimpleSendReceive() = runTest {
+ // Parametrized common test :(
+ TestChannelKind.values().forEach { kind -> testSendReceive(kind, 100) }
+ }
+
+ private suspend fun testSendReceive(kind: TestChannelKind, iterations: Int) {
+ val channel = kind.create()
+
+ launch(coroutineContext) {
+ repeat(iterations) { channel.send(it) }
+ channel.close()
+ }
+ var expected = 0
+ for (x in channel) {
+ if (!kind.isConflated) {
+ assertEquals(expected++, x)
+ } else {
+ assertTrue(x >= expected)
+ expected = x + 1
+ }
+ }
+ if (!kind.isConflated) {
+ assertEquals(iterations, expected)
+ }
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestBroadcastChannelKind.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestBroadcastChannelKind.kt
new file mode 100644
index 0000000..60dbb97
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestBroadcastChannelKind.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+enum class TestBroadcastChannelKind {
+ ARRAY_1 {
+ override fun <T> create(): BroadcastChannel<T> = ArrayBroadcastChannel(1)
+ override fun toString(): String = "ArrayBroadcastChannel(1)"
+ },
+ ARRAY_10 {
+ override fun <T> create(): BroadcastChannel<T> = ArrayBroadcastChannel(10)
+ override fun toString(): String = "ArrayBroadcastChannel(10)"
+ },
+ CONFLATED {
+ override fun <T> create(): BroadcastChannel<T> = ConflatedBroadcastChannel()
+ override fun toString(): String = "ConflatedBroadcastChannel"
+ override val isConflated: Boolean get() = true
+ }
+ ;
+
+ abstract fun <T> create(): BroadcastChannel<T>
+ open val isConflated: Boolean get() = false
+}
\ No newline at end of file
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
new file mode 100644
index 0000000..c3ac904
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.selects.SelectClause1
+
+enum class TestChannelKind {
+ RENDEZVOUS {
+ override fun create(): Channel<Int> = RendezvousChannel()
+ override fun toString(): String = "RendezvousChannel"
+ },
+ ARRAY_1 {
+ override fun create(): Channel<Int> = ArrayChannel(1)
+ override fun toString(): String = "ArrayChannel(1)"
+ },
+ ARRAY_10 {
+ override fun create(): Channel<Int> = ArrayChannel(8)
+ override fun toString(): String = "ArrayChannel(8)"
+ },
+ LINKED_LIST {
+ override fun create(): Channel<Int> = LinkedListChannel()
+ override fun toString(): String = "LinkedListChannel"
+ },
+ CONFLATED {
+ override fun create(): Channel<Int> = ConflatedChannel()
+ override fun toString(): String = "ConflatedChannel"
+ override val isConflated: Boolean get() = true
+ },
+ ARRAY_BROADCAST_1 {
+ override fun create(): Channel<Int> = ChannelViaBroadcast(ArrayBroadcastChannel<Int>(1))
+ override fun toString(): String = "ArrayBroadcastChannel(1)"
+ },
+ ARRAY_BROADCAST_10 {
+ override fun create(): Channel<Int> = ChannelViaBroadcast(ArrayBroadcastChannel<Int>(10))
+ override fun toString(): String = "ArrayBroadcastChannel(10)"
+ },
+ CONFLATED_BROADCAST {
+ override fun create(): Channel<Int> = ChannelViaBroadcast(ConflatedBroadcastChannel<Int>())
+ override fun toString(): String = "ConflatedBroadcastChannel"
+ override val isConflated: Boolean get() = true
+ }
+ ;
+
+ abstract fun create(): Channel<Int>
+ open val isConflated: Boolean get() = false
+}
+
+private class ChannelViaBroadcast<E>(
+ private val broadcast: BroadcastChannel<E>
+): Channel<E>, SendChannel<E> by broadcast {
+ val sub = broadcast.openSubscription()
+
+ override val isClosedForReceive: Boolean get() = sub.isClosedForReceive
+ override val isEmpty: Boolean get() = sub.isEmpty
+
+ // Workaround for KT-23094
+ override suspend fun send(element: E) = broadcast.send(element)
+
+ override suspend fun receive(): E = sub.receive()
+ override suspend fun receiveOrNull(): E? = sub.receiveOrNull()
+ override fun poll(): E? = sub.poll()
+ override fun iterator(): ChannelIterator<E> = sub.iterator()
+ override fun cancel(cause: Throwable?): Boolean = sub.cancel(cause)
+ override val onReceive: SelectClause1<E>
+ get() = sub.onReceive
+ override val onReceiveOrNull: SelectClause1<E?>
+ get() = sub.onReceiveOrNull
+}