blob: 9f0012c1b57d706e6f6b118507921bbc9ecc233c [file] [log] [blame]
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03001/*
2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
Vsevolod Tolstopyatov1dbc25e2018-04-18 14:50:26 +03005package kotlinx.coroutines.experimental.channels
6
7import kotlinx.coroutines.experimental.*
8import kotlinx.coroutines.experimental.selects.*
9import org.junit.Test
10import org.junit.runner.*
11import org.junit.runners.*
Vsevolod Tolstopyatov1dbc25e2018-04-18 14:50:26 +030012import kotlin.test.*
13
Vsevolod Tolstopyatov1dbc25e2018-04-18 14:50:26 +030014@RunWith(Parameterized::class)
15class TimerChannelCommonTest(private val channelFactory: Channel) : TestBase() {
Vsevolod Tolstopyatov1dbc25e2018-04-18 14:50:26 +030016 companion object {
17 @Parameterized.Parameters(name = "{0}")
18 @JvmStatic
19 fun params(): Collection<Array<Any>> =
20 Channel.values().map { arrayOf<Any>(it) }
21 }
22
23 enum class Channel {
Roman Elizarovb5328a72018-06-06 18:31:21 +030024 FIXED_PERIOD {
25 override fun invoke(delay: Long, initialDelay: Long) =
26 ticker(delay, initialDelay = initialDelay, mode = TickerMode.FIXED_PERIOD)
Vsevolod Tolstopyatov1dbc25e2018-04-18 14:50:26 +030027 },
28
29 FIXED_DELAY {
Roman Elizarovb5328a72018-06-06 18:31:21 +030030 override fun invoke(delay: Long, initialDelay: Long) =
31 ticker(delay, initialDelay = initialDelay, mode = TickerMode.FIXED_DELAY)
Vsevolod Tolstopyatov1dbc25e2018-04-18 14:50:26 +030032 };
33
Vsevolod Tolstopyatov1dbc25e2018-04-18 14:50:26 +030034 abstract operator fun invoke(delay: Long, initialDelay: Long = 0): ReceiveChannel<Unit>
35 }
36
Vsevolod Tolstopyatov1dbc25e2018-04-18 14:50:26 +030037 @Test
38 fun testDelay() = runTest {
39 val delayChannel = channelFactory(delay = 100)
40 delayChannel.checkNotEmpty()
41 delayChannel.checkEmpty()
42
43 delay(50)
44 delayChannel.checkEmpty()
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +030045 delay(51)
Vsevolod Tolstopyatov1dbc25e2018-04-18 14:50:26 +030046 delayChannel.checkNotEmpty()
47
48 delayChannel.cancel()
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +030049 delay(51)
Vsevolod Tolstopyatov1dbc25e2018-04-18 14:50:26 +030050 delayChannel.checkEmpty()
51 delayChannel.cancel()
52 }
53
54 @Test
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +030055 fun testInitialDelay() = runTest {
Vsevolod Tolstopyatov1dbc25e2018-04-18 14:50:26 +030056 val delayChannel = channelFactory(initialDelay = 75, delay = 100)
57 delayChannel.checkEmpty()
58 delay(50)
59 delayChannel.checkEmpty()
60 delay(30)
61 delayChannel.checkNotEmpty()
62
63 // Regular delay
64 delay(75)
65 delayChannel.checkEmpty()
66 delay(26)
67 delayChannel.checkNotEmpty()
68 delayChannel.cancel()
69 }
70
71
72 @Test
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +030073 fun testReceive() = runTest {
Vsevolod Tolstopyatov1dbc25e2018-04-18 14:50:26 +030074 val delayChannel = channelFactory(delay = 100)
75 delayChannel.checkNotEmpty()
76 var value = withTimeoutOrNull(75) {
77 delayChannel.receive()
78 1
79 }
80
81 assertNull(value)
82 value = withTimeoutOrNull(26) {
83 delayChannel.receive()
84 1
85 }
86
87 assertNotNull(value)
88 delayChannel.cancel()
89 }
90
91 @Test
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +030092 fun testComplexOperator() = runTest {
Vsevolod Tolstopyatov1dbc25e2018-04-18 14:50:26 +030093 val producer = produce {
94 for (i in 1..7) {
95 send(i)
96 delay(100)
97 }
98 }
99
100 val averages = producer.averageInTimeWindow(300).toList()
101 assertEquals(listOf(2.0, 5.0, 7.0), averages)
102 }
103
104 private fun ReceiveChannel<Int>.averageInTimeWindow(timespan: Long) = produce {
105 val delayChannel = channelFactory(delay = timespan, initialDelay = timespan)
106 var sum = 0
107 var n = 0
108 whileSelect {
109 this@averageInTimeWindow.onReceiveOrNull {
110 when (it) {
111 null -> {
112 // Send leftovers and bail out
113 if (n != 0) send(sum / n.toDouble())
114 false
115 }
116 else -> {
117 sum += it
118 ++n
119 true
120 }
121 }
122 }
123
124 // Timeout, send aggregated average and reset counters
125 delayChannel.onReceive {
126 send(sum / n.toDouble())
127 sum = 0
128 n = 0
129 true
130 }
131 }
132
133 delayChannel.cancel()
134 }
135
136 @Test
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +0300137 fun testStress() = runTest {
Vsevolod Tolstopyatov1dbc25e2018-04-18 14:50:26 +0300138 // No OOM/SOE
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +0300139 val iterations = 100_000 * stressTestMultiplier
Vsevolod Tolstopyatov1dbc25e2018-04-18 14:50:26 +0300140 val delayChannel = channelFactory(0)
141 repeat(iterations) {
142 delayChannel.receive()
143 }
144
145 delayChannel.cancel()
146 }
147
148 @Test(expected = IllegalArgumentException::class)
149 fun testNegativeDelay() {
150 channelFactory(-1)
151 }
152
153 @Test(expected = IllegalArgumentException::class)
154 fun testNegativeInitialDelay() {
155 channelFactory(initialDelay = -1, delay = 100)
156 }
157}
158
159fun ReceiveChannel<Unit>.checkEmpty() = assertNull(poll())
160
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +0300161fun ReceiveChannel<Unit>.checkNotEmpty() {
162 assertNotNull(poll())
163 assertNull(poll())
164}