| /* |
| * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
| */ |
| |
| package kotlinx.coroutines.experimental.channels |
| |
| import kotlinx.coroutines.experimental.* |
| import kotlin.coroutines.experimental.* |
| import kotlin.test.* |
| |
| class ArrayBroadcastChannelTest : TestBase() { |
| |
| @Test |
| fun testConcurrentModification() = runTest { |
| val channel = ArrayBroadcastChannel<Int>(1) |
| val s1 = channel.openSubscription() |
| val s2 = channel.openSubscription() |
| |
| val job1 = launch(Unconfined, CoroutineStart.UNDISPATCHED) { |
| expect(1) |
| s1.receive() |
| s1.cancel() |
| } |
| |
| val job2 = launch(Unconfined, CoroutineStart.UNDISPATCHED) { |
| expect(2) |
| s2.receive() |
| } |
| |
| expect(3) |
| channel.send(1) |
| joinAll(job1, job2) |
| finish(4) |
| } |
| |
| @Test |
| fun testBasic() = runTest { |
| expect(1) |
| val broadcast = ArrayBroadcastChannel<Int>(1) |
| assertFalse(broadcast.isClosedForSend) |
| val first = broadcast.openSubscription() |
| launch(coroutineContext, CoroutineStart.UNDISPATCHED) { |
| expect(2) |
| assertEquals(1, first.receive()) // suspends |
| assertFalse(first.isClosedForReceive) |
| expect(5) |
| assertEquals(2, first.receive()) // suspends |
| assertFalse(first.isClosedForReceive) |
| expect(10) |
| assertNull(first.receiveOrNull()) // suspends |
| assertTrue(first.isClosedForReceive) |
| expect(14) |
| } |
| expect(3) |
| broadcast.send(1) |
| expect(4) |
| yield() // to the first receiver |
| expect(6) |
| |
| val second = broadcast.openSubscription() |
| launch(coroutineContext, CoroutineStart.UNDISPATCHED) { |
| expect(7) |
| assertEquals(2, second.receive()) // suspends |
| assertFalse(second.isClosedForReceive) |
| expect(11) |
| assertNull(second.receiveOrNull()) // suspends |
| assertTrue(second.isClosedForReceive) |
| expect(15) |
| } |
| expect(8) |
| broadcast.send(2) |
| expect(9) |
| yield() // to first & second receivers |
| expect(12) |
| broadcast.close() |
| expect(13) |
| assertTrue(broadcast.isClosedForSend) |
| yield() // to first & second receivers |
| finish(16) |
| } |
| |
| @Test |
| fun testSendSuspend() = runTest { |
| expect(1) |
| val broadcast = ArrayBroadcastChannel<Int>(1) |
| val first = broadcast.openSubscription() |
| launch(coroutineContext) { |
| expect(4) |
| assertEquals(1, first.receive()) |
| expect(5) |
| assertEquals(2, first.receive()) |
| expect(6) |
| } |
| expect(2) |
| broadcast.send(1) // puts to buffer, receiver not running yet |
| expect(3) |
| broadcast.send(2) // suspends |
| finish(7) |
| } |
| |
| @Test |
| fun testConcurrentSendCompletion() = runTest { |
| expect(1) |
| val broadcast = ArrayBroadcastChannel<Int>(1) |
| val sub = broadcast.openSubscription() |
| // launch 3 concurrent senders (one goes buffer, two other suspend) |
| for (x in 1..3) { |
| launch(coroutineContext, CoroutineStart.UNDISPATCHED) { |
| expect(x + 1) |
| broadcast.send(x) |
| } |
| } |
| // and close it for send |
| expect(5) |
| broadcast.close() |
| // now must receive all 3 items |
| expect(6) |
| assertFalse(sub.isClosedForReceive) |
| for (x in 1..3) |
| assertEquals(x, sub.receiveOrNull()) |
| // and receive close signal |
| assertNull(sub.receiveOrNull()) |
| assertTrue(sub.isClosedForReceive) |
| finish(7) |
| } |
| |
| @Test |
| fun testForgetUnsubscribed() = runTest { |
| expect(1) |
| val broadcast = ArrayBroadcastChannel<Int>(1) |
| broadcast.send(1) |
| broadcast.send(2) |
| broadcast.send(3) |
| expect(2) // should not suspend anywhere above |
| val sub = broadcast.openSubscription() |
| launch(coroutineContext, CoroutineStart.UNDISPATCHED) { |
| expect(3) |
| assertEquals(4, sub.receive()) // suspends |
| expect(5) |
| } |
| expect(4) |
| broadcast.send(4) // sends |
| yield() |
| finish(6) |
| } |
| |
| @Test |
| fun testReceiveFullAfterClose() = runTest { |
| val channel = BroadcastChannel<Int>(10) |
| val sub = channel.openSubscription() |
| // generate into buffer & close |
| for (x in 1..5) channel.send(x) |
| channel.close() |
| // make sure all of them are consumed |
| check(!sub.isClosedForReceive) |
| for (x in 1..5) check(sub.receive() == x) |
| check(sub.receiveOrNull() == null) |
| check(sub.isClosedForReceive) |
| } |
| |
| @Test |
| fun testCloseSubDuringIteration() = runTest { |
| val channel = BroadcastChannel<Int>(1) |
| // launch generator (for later) in this context |
| launch(coroutineContext) { |
| for (x in 1..5) channel.send(x) |
| channel.close() |
| } |
| // start consuming |
| val sub = channel.openSubscription() |
| var expected = 0 |
| sub.consumeEach { |
| check(it == ++expected) |
| if (it == 2) { |
| sub.cancel() |
| } |
| } |
| check(expected == 2) |
| } |
| |
| @Test |
| fun testReceiveFromClosedSub() = runTest({ it is ClosedReceiveChannelException }) { |
| val channel = BroadcastChannel<Int>(1) |
| val sub = channel.openSubscription() |
| assertFalse(sub.isClosedForReceive) |
| sub.cancel() |
| assertTrue(sub.isClosedForReceive) |
| sub.receive() |
| } |
| |
| @Test |
| fun testCancelWithCause() = runTest({ it is TestException }) { |
| val channel = BroadcastChannel<Int>(1) |
| val subscription = channel.openSubscription() |
| subscription.cancel(TestException()) |
| subscription.receiveOrNull() |
| } |
| |
| @Test |
| fun testReceiveNoneAfterCancel() = runTest { |
| val channel = BroadcastChannel<Int>(10) |
| val sub = channel.openSubscription() |
| // generate into buffer & cancel |
| for (x in 1..5) channel.send(x) |
| channel.cancel() |
| assertTrue(channel.isClosedForSend) |
| assertTrue(sub.isClosedForReceive) |
| check(sub.receiveOrNull() == null) |
| } |
| |
| private class TestException : Exception() |
| } |