blob: b946969cddee7ac35e13c53a1835474739f4082f [file] [log] [blame]
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +03003 */
4
5package kotlinx.coroutines.experimental.channels
6
7import kotlinx.coroutines.experimental.*
8import kotlin.coroutines.experimental.*
9import kotlin.test.*
10
11class ConflatedBroadcastChannelTest : TestBase() {
12
13 @Test
Vsevolod Tolstopyatov167ca632018-06-29 11:49:00 +030014 fun testConcurrentModification() = runTest {
15 val channel = ConflatedBroadcastChannel<Int>()
16 val s1 = channel.openSubscription()
17 val s2 = channel.openSubscription()
18
19 val job1 = launch(Unconfined, CoroutineStart.UNDISPATCHED) {
20 expect(1)
21 s1.receive()
22 s1.cancel()
23 }
24
25 val job2 = launch(Unconfined, CoroutineStart.UNDISPATCHED) {
26 expect(2)
27 s2.receive()
28 }
29
30 expect(3)
31 channel.send(1)
32 joinAll(job1, job2)
33 finish(4)
34 }
35
36 @Test
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +030037 fun testBasicScenario() = runTest {
38 expect(1)
39 val broadcast = ConflatedBroadcastChannel<String>()
40 assertTrue(exceptionFromNotInline { broadcast.value } is IllegalStateException)
41 assertNull(broadcast.valueOrNull)
42
43 launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
44 expect(2)
45 val sub = broadcast.openSubscription()
46 assertNull(sub.poll())
47 expect(3)
48 assertEquals("one", sub.receive()) // suspends
49 expect(6)
50 assertEquals("two", sub.receive()) // suspends
51 expect(12)
Marko Devcic1d6230a2018-04-04 20:13:08 +020052 sub.cancel()
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +030053 expect(13)
54 }
55
56 expect(4)
57 broadcast.send("one") // does not suspend
58 assertEquals("one", broadcast.value)
59 assertEquals("one", broadcast.valueOrNull)
60 expect(5)
61 yield() // to receiver
62 expect(7)
63 launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
64 expect(8)
65 val sub = broadcast.openSubscription()
66 assertEquals("one", sub.receive()) // does not suspend
67 expect(9)
68 assertEquals("two", sub.receive()) // suspends
69 expect(14)
70 assertEquals("three", sub.receive()) // suspends
71 expect(17)
72 assertNull(sub.receiveOrNull()) // suspends until closed
73 expect(20)
Marko Devcic1d6230a2018-04-04 20:13:08 +020074 sub.cancel()
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +030075 expect(21)
76 }
77
78 expect(10)
79 broadcast.send("two") // does not suspend
80 assertEquals("two", broadcast.value)
81 assertEquals("two", broadcast.valueOrNull)
82 expect(11)
83 yield() // to both receivers
84 expect(15)
85 broadcast.send("three") // does not suspend
86 assertEquals("three", broadcast.value)
87 assertEquals("three", broadcast.valueOrNull)
88 expect(16)
89 yield() // to second receiver
90 expect(18)
91 broadcast.close()
92 assertTrue(exceptionFromNotInline { broadcast.value } is IllegalStateException)
93 assertNull(broadcast.valueOrNull)
94 expect(19)
95 yield() // to second receiver
96 assertTrue(exceptionFrom { broadcast.send("four") } is ClosedSendChannelException)
97 finish(22)
98 }
99
100 @Test
101 fun testInitialValueAndReceiveClosed() = runTest {
102 expect(1)
103 val broadcast = ConflatedBroadcastChannel(1)
104 assertEquals(1, broadcast.value)
105 assertEquals(1, broadcast.valueOrNull)
106 launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
107 expect(2)
108 val sub = broadcast.openSubscription()
109 assertEquals(1, sub.receive())
110 expect(3)
111 assertTrue(exceptionFrom { sub.receive() } is ClosedReceiveChannelException) // suspends
112 expect(6)
113 }
114 expect(4)
115 broadcast.close()
116 expect(5)
117 yield() // to child
118 finish(7)
119 }
120
121 inline fun exceptionFrom(block: () -> Unit): Throwable? {
122 try {
123 block()
124 return null
125 } catch (e: Throwable) {
126 return e
127 }
128 }
129
Vsevolod Tolstopyatov2cdbfd72018-04-22 18:26:14 +0300130 // Workaround for KT-23921
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +0300131 fun exceptionFromNotInline(block: () -> Unit): Throwable? {
132 try {
133 block()
134 return null
135 } catch (e: Throwable) {
136 return e
137 }
138 }
139}