blob: d381e819601a944a01a0d35215c1898cc8f4c38d [file] [log] [blame]
Roman Elizarove3aa8ff2017-04-27 19:16:40 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarove3aa8ff2017-04-27 19:16:40 +03003 */
4
5package kotlinx.coroutines.experimental.channels
6
7import kotlinx.coroutines.experimental.*
Roman Elizarov9fe5f462018-02-21 19:05:52 +03008import kotlin.coroutines.experimental.*
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +03009import kotlin.test.*
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030010
11class ArrayBroadcastChannelTest : TestBase() {
Vsevolod Tolstopyatov167ca632018-06-29 11:49:00 +030012
13 @Test
14 fun testConcurrentModification() = runTest {
15 val channel = ArrayBroadcastChannel<Int>(1)
16 val s1 = channel.openSubscription()
17 val s2 = channel.openSubscription()
18
19 val job1 = launch(Unconfined, CoroutineStart.UNDISPATCHED) {
20 expect(1)
21 s1.receive()
22 s1.cancel()
23 }
24
25 val job2 = launch(Unconfined, CoroutineStart.UNDISPATCHED) {
26 expect(2)
27 s2.receive()
28 }
29
30 expect(3)
31 channel.send(1)
32 joinAll(job1, job2)
33 finish(4)
34 }
35
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030036 @Test
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +030037 fun testBasic() = runTest {
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030038 expect(1)
39 val broadcast = ArrayBroadcastChannel<Int>(1)
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +030040 assertFalse(broadcast.isClosedForSend)
Roman Elizarov0f66a6d2017-06-22 14:57:53 +030041 val first = broadcast.openSubscription()
Roman Elizarov43e3af72017-07-21 16:01:31 +030042 launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030043 expect(2)
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +030044 assertEquals(1, first.receive()) // suspends
45 assertFalse(first.isClosedForReceive)
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030046 expect(5)
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +030047 assertEquals(2, first.receive()) // suspends
48 assertFalse(first.isClosedForReceive)
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030049 expect(10)
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +030050 assertNull(first.receiveOrNull()) // suspends
51 assertTrue(first.isClosedForReceive)
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030052 expect(14)
53 }
54 expect(3)
55 broadcast.send(1)
56 expect(4)
57 yield() // to the first receiver
58 expect(6)
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +030059
Roman Elizarov0f66a6d2017-06-22 14:57:53 +030060 val second = broadcast.openSubscription()
Roman Elizarov43e3af72017-07-21 16:01:31 +030061 launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030062 expect(7)
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +030063 assertEquals(2, second.receive()) // suspends
64 assertFalse(second.isClosedForReceive)
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030065 expect(11)
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +030066 assertNull(second.receiveOrNull()) // suspends
67 assertTrue(second.isClosedForReceive)
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030068 expect(15)
69 }
70 expect(8)
71 broadcast.send(2)
72 expect(9)
73 yield() // to first & second receivers
74 expect(12)
75 broadcast.close()
76 expect(13)
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +030077 assertTrue(broadcast.isClosedForSend)
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030078 yield() // to first & second receivers
79 finish(16)
80 }
81
82 @Test
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +030083 fun testSendSuspend() = runTest {
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030084 expect(1)
85 val broadcast = ArrayBroadcastChannel<Int>(1)
Roman Elizarov0f66a6d2017-06-22 14:57:53 +030086 val first = broadcast.openSubscription()
Roman Elizarov43e3af72017-07-21 16:01:31 +030087 launch(coroutineContext) {
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030088 expect(4)
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +030089 assertEquals(1, first.receive())
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030090 expect(5)
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +030091 assertEquals(2, first.receive())
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030092 expect(6)
93 }
94 expect(2)
95 broadcast.send(1) // puts to buffer, receiver not running yet
96 expect(3)
97 broadcast.send(2) // suspends
98 finish(7)
99 }
100
101 @Test
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +0300102 fun testConcurrentSendCompletion() = runTest {
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300103 expect(1)
104 val broadcast = ArrayBroadcastChannel<Int>(1)
Roman Elizarov0f66a6d2017-06-22 14:57:53 +0300105 val sub = broadcast.openSubscription()
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300106 // launch 3 concurrent senders (one goes buffer, two other suspend)
107 for (x in 1..3) {
Roman Elizarov43e3af72017-07-21 16:01:31 +0300108 launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300109 expect(x + 1)
110 broadcast.send(x)
111 }
112 }
113 // and close it for send
114 expect(5)
115 broadcast.close()
116 // now must receive all 3 items
117 expect(6)
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +0300118 assertFalse(sub.isClosedForReceive)
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300119 for (x in 1..3)
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +0300120 assertEquals(x, sub.receiveOrNull())
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300121 // and receive close signal
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +0300122 assertNull(sub.receiveOrNull())
123 assertTrue(sub.isClosedForReceive)
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300124 finish(7)
125 }
Roman Elizarov921b0cf2017-06-22 14:36:55 +0300126
127 @Test
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +0300128 fun testForgetUnsubscribed() = runTest {
Roman Elizarov921b0cf2017-06-22 14:36:55 +0300129 expect(1)
130 val broadcast = ArrayBroadcastChannel<Int>(1)
131 broadcast.send(1)
132 broadcast.send(2)
133 broadcast.send(3)
134 expect(2) // should not suspend anywhere above
Roman Elizarov0f66a6d2017-06-22 14:57:53 +0300135 val sub = broadcast.openSubscription()
Roman Elizarov43e3af72017-07-21 16:01:31 +0300136 launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
Roman Elizarov921b0cf2017-06-22 14:36:55 +0300137 expect(3)
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +0300138 assertEquals(4, sub.receive()) // suspends
Roman Elizarov921b0cf2017-06-22 14:36:55 +0300139 expect(5)
140 }
141 expect(4)
142 broadcast.send(4) // sends
143 yield()
144 finish(6)
145 }
Roman Elizarovc2adef52017-08-17 16:13:41 +0300146
147 @Test
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +0300148 fun testReceiveFullAfterClose() = runTest {
Roman Elizarovc2adef52017-08-17 16:13:41 +0300149 val channel = BroadcastChannel<Int>(10)
150 val sub = channel.openSubscription()
151 // generate into buffer & close
152 for (x in 1..5) channel.send(x)
153 channel.close()
154 // make sure all of them are consumed
155 check(!sub.isClosedForReceive)
156 for (x in 1..5) check(sub.receive() == x)
157 check(sub.receiveOrNull() == null)
158 check(sub.isClosedForReceive)
159 }
160
161 @Test
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +0300162 fun testCloseSubDuringIteration() = runTest {
Roman Elizarovc2adef52017-08-17 16:13:41 +0300163 val channel = BroadcastChannel<Int>(1)
164 // launch generator (for later) in this context
165 launch(coroutineContext) {
166 for (x in 1..5) channel.send(x)
167 channel.close()
168 }
169 // start consuming
170 val sub = channel.openSubscription()
171 var expected = 0
172 sub.consumeEach {
173 check(it == ++expected)
174 if (it == 2) {
Roman Elizarovf2bdf602018-04-26 11:29:47 +0300175 sub.cancel()
Roman Elizarovc2adef52017-08-17 16:13:41 +0300176 }
177 }
178 check(expected == 2)
179 }
Roman Elizarovb2b5c062018-02-07 12:55:44 +0300180
181 @Test
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +0300182 fun testReceiveFromClosedSub() = runTest({ it is ClosedReceiveChannelException }) {
Roman Elizarovb2b5c062018-02-07 12:55:44 +0300183 val channel = BroadcastChannel<Int>(1)
184 val sub = channel.openSubscription()
185 assertFalse(sub.isClosedForReceive)
Roman Elizarovf2bdf602018-04-26 11:29:47 +0300186 sub.cancel()
Roman Elizarovb2b5c062018-02-07 12:55:44 +0300187 assertTrue(sub.isClosedForReceive)
188 sub.receive()
189 }
Vsevolod Tolstopyatov4b9a5592018-04-11 13:17:14 +0300190
Roman Elizarovf2bdf602018-04-26 11:29:47 +0300191 @Test
192 fun testCancelWithCause() = runTest({ it is TestException }) {
Vsevolod Tolstopyatov4b9a5592018-04-11 13:17:14 +0300193 val channel = BroadcastChannel<Int>(1)
194 val subscription = channel.openSubscription()
Roman Elizarovf2bdf602018-04-26 11:29:47 +0300195 subscription.cancel(TestException())
Vsevolod Tolstopyatov4b9a5592018-04-11 13:17:14 +0300196 subscription.receiveOrNull()
197 }
Roman Elizarovf2bdf602018-04-26 11:29:47 +0300198
Roman Elizarov89f8ff72018-03-14 13:39:03 +0300199 @Test
200 fun testReceiveNoneAfterCancel() = runTest {
201 val channel = BroadcastChannel<Int>(10)
202 val sub = channel.openSubscription()
203 // generate into buffer & cancel
204 for (x in 1..5) channel.send(x)
205 channel.cancel()
206 assertTrue(channel.isClosedForSend)
207 assertTrue(sub.isClosedForReceive)
208 check(sub.receiveOrNull() == null)
209 }
210
Roman Elizarovf2bdf602018-04-26 11:29:47 +0300211 private class TestException : Exception()
Vsevolod Tolstopyatov4b9a5592018-04-11 13:17:14 +0300212}