blob: b347574891e5fb978c9c6aad93575d6ba3b1fa36 [file] [log] [blame]
Roman Elizarov43918972017-10-07 21:00:06 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kotlinx.coroutines.experimental.channels
18
19import kotlinx.coroutines.experimental.*
Roman Elizarovaa461cf2018-04-11 13:20:29 +030020import kotlinx.coroutines.experimental.timeunit.*
Roman Elizarov9fe5f462018-02-21 19:05:52 +030021import org.junit.*
22import org.junit.runner.*
23import org.junit.runners.*
Roman Elizarov9fe5f462018-02-21 19:05:52 +030024import java.util.concurrent.atomic.*
25import kotlin.coroutines.experimental.*
Roman Elizarov43918972017-10-07 21:00:06 +030026
27/**
28 * Creates a broadcast channel and repeatedly opens new subscription, receives event, closes it,
29 * to stress test the logic of opening the subscription
30 * to broadcast channel while events are being concurrently sent to it.
31 */
32@RunWith(Parameterized::class)
33class BroadcastChannelSubStressTest(
34 val kind: TestBroadcastChannelKind
35) : TestBase() {
36 companion object {
37 @Parameterized.Parameters(name = "{0}")
38 @JvmStatic
39 fun params(): Collection<Array<Any>> =
40 TestBroadcastChannelKind.values().map { arrayOf<Any>(it) }
41 }
42
43 private val nSeconds = 5 * stressTestMultiplier
Roman Elizarov45c85652017-10-13 14:46:22 +030044 private val broadcast = kind.create<Long>()
Roman Elizarov43918972017-10-07 21:00:06 +030045
Roman Elizarov45c85652017-10-13 14:46:22 +030046 private val sentTotal = AtomicLong()
47 private val receivedTotal = AtomicLong()
Roman Elizarov43918972017-10-07 21:00:06 +030048
49 @Test
50 fun testStress() = runBlocking {
Roman Elizarovd3d335b2017-10-21 17:43:53 +030051 println("--- BroadcastChannelSubStressTest $kind")
Roman Elizarov43918972017-10-07 21:00:06 +030052 val ctx = coroutineContext + CommonPool
53 val sender =
54 launch(context = ctx + CoroutineName("Sender")) {
55 while (isActive) {
56 broadcast.send(sentTotal.incrementAndGet())
57 }
58 }
59 val receiver =
60 launch(context = ctx + CoroutineName("Receiver")) {
Roman Elizarov45c85652017-10-13 14:46:22 +030061 var last = -1L
Roman Elizarov43918972017-10-07 21:00:06 +030062 while (isActive) {
Marko Devcic1d6230a2018-04-04 20:13:08 +020063 val channel = broadcast.openSubscription()
64 val i = channel.receive()
65 check(i >= last) { "Last was $last, got $i" }
66 if (!kind.isConflated) check(i != last) { "Last was $last, got it again" }
67 receivedTotal.incrementAndGet()
68 last = i
69 channel.cancel()
Roman Elizarov43918972017-10-07 21:00:06 +030070 }
71 }
Roman Elizarov9525f3c2017-10-27 12:11:55 +030072 var prevSent = -1L
Roman Elizarov43918972017-10-07 21:00:06 +030073 repeat(nSeconds) { sec ->
74 delay(1000)
Roman Elizarov9525f3c2017-10-27 12:11:55 +030075 val curSent = sentTotal.get()
76 println("${sec + 1}: Sent $curSent, received ${receivedTotal.get()}")
77 check(curSent > prevSent) { "Send stalled at $curSent events" }
78 prevSent = curSent
Roman Elizarov43918972017-10-07 21:00:06 +030079 }
80 withTimeout(5, TimeUnit.SECONDS) {
81 sender.cancelAndJoin()
82 receiver.cancelAndJoin()
83 }
84 }
85}