blob: 7da81cc8f74d5e1b028eb02bc2c7d2dc2f7e3508 [file] [log] [blame]
Roman Elizarove6e8ce82017-06-05 17:04:39 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarove6e8ce82017-06-05 17:04:39 +03003 */
4
5package kotlinx.coroutines.experimental.channels
6
7import kotlinx.coroutines.experimental.*
Roman Elizarov9fe5f462018-02-21 19:05:52 +03008import org.hamcrest.MatcherAssert.*
9import org.hamcrest.core.*
10import org.junit.*
11import java.util.concurrent.atomic.*
12import kotlin.coroutines.experimental.*
Roman Elizarove6e8ce82017-06-05 17:04:39 +030013
14class ConflatedBroadcastChannelNotifyStressTest : TestBase() {
15 val nSenders = 2
16 val nReceivers = 3
17 val nEvents = 1_000_000 * stressTestMultiplier
18 val timeLimit = 30_000L * stressTestMultiplier // 30 sec
19
20 val broadcast = ConflatedBroadcastChannel<Int>()
21
22 val sendersCompleted = AtomicInteger()
23 val receiversCompleted = AtomicInteger()
24 val sentTotal = AtomicInteger()
25 val receivedTotal = AtomicInteger()
26
27 @Test
28 fun testStressNotify()= runBlocking<Unit> {
Roman Elizarovd3d335b2017-10-21 17:43:53 +030029 println("--- ConflatedBroadcastChannelNotifyStressTest")
Roman Elizarove6e8ce82017-06-05 17:04:39 +030030 val senders = List(nSenders) { senderId ->
31 launch(CommonPool + CoroutineName("Sender$senderId")) {
32 repeat(nEvents) { i ->
33 if (i % nSenders == senderId) {
34 broadcast.offer(i)
35 sentTotal.incrementAndGet()
36 yield()
37 }
38 }
39 sendersCompleted.incrementAndGet()
40 }
41 }
42 val receivers = List(nReceivers) { receiverId ->
43 launch(CommonPool + CoroutineName("Receiver$receiverId")) {
44 var last = -1
45 while (isActive) {
46 val i = waitForEvent()
47 if (i > last) {
48 receivedTotal.incrementAndGet()
49 last = i
50 }
51 if (i >= nEvents) break
52 yield()
53 }
54 receiversCompleted.incrementAndGet()
55 }
56 }
57 // print progress
Roman Elizarov43e3af72017-07-21 16:01:31 +030058 val progressJob = launch(coroutineContext) {
Roman Elizarove6e8ce82017-06-05 17:04:39 +030059 var seconds = 0
60 while (true) {
61 delay(1000)
62 println("${++seconds}: Sent ${sentTotal.get()}, received ${receivedTotal.get()}")
63 }
64 }
65 try {
66 withTimeout(timeLimit) {
67 senders.forEach { it.join() }
68 broadcast.offer(nEvents) // last event to signal receivers termination
69 receivers.forEach { it.join() }
70 }
71 } catch (e: CancellationException) {
72 println("!!! Test timed out $e")
73 }
74 progressJob.cancel()
75 println("Tested with nSenders=$nSenders, nReceivers=$nReceivers")
76 println("Completed successfully ${sendersCompleted.get()} sender coroutines")
77 println("Completed successfully ${receiversCompleted.get()} receiver coroutines")
78 println(" Sent ${sentTotal.get()} events")
79 println(" Received ${receivedTotal.get()} events")
80 assertThat(sendersCompleted.get(), IsEqual(nSenders))
81 assertThat(receiversCompleted.get(), IsEqual(nReceivers))
82 assertThat(sentTotal.get(), IsEqual(nEvents))
83 }
84
85 suspend fun waitForEvent(): Int =
Marko Devcic1d6230a2018-04-04 20:13:08 +020086 with(broadcast.openSubscription()) {
87 val value = receive()
88 cancel()
89 value
Roman Elizarove6e8ce82017-06-05 17:04:39 +030090 }
91}