blob: 7d7b33c3632d15d253f93f66b9bd0e79319bf06d [file] [log] [blame]
Roman Elizarove6e8ce82017-06-05 17:04:39 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarove6e8ce82017-06-05 17:04:39 +03003 */
4
5package kotlinx.coroutines.experimental.channels
6
Roman Elizarove6e8ce82017-06-05 17:04:39 +03007import kotlinx.coroutines.experimental.*
Roman Elizarov656bcd92017-06-07 09:24:19 +03008import org.junit.After
Roman Elizarove6e8ce82017-06-05 17:04:39 +03009import org.junit.Test
10import java.util.concurrent.atomic.AtomicInteger
11import java.util.concurrent.atomic.AtomicReference
Roman Elizarov9fe5f462018-02-21 19:05:52 +030012import kotlin.coroutines.experimental.*
Roman Elizarove6e8ce82017-06-05 17:04:39 +030013
14class ConflatedChannelCloseStressTest : TestBase() {
15
16 val nSenders = 2
17 val testSeconds = 3 * stressTestMultiplier
18
19 val curChannel = AtomicReference<ConflatedChannel<Int>>(ConflatedChannel())
20 val sent = AtomicInteger()
21 val closed = AtomicInteger()
22 val received = AtomicInteger()
23
Roman Elizarov656bcd92017-06-07 09:24:19 +030024 val pool = newFixedThreadPoolContext(nSenders + 2, "TestStressClose")
25
26 @After
Roman Elizarovd9ae2bc2017-10-20 17:36:56 +080027 fun tearDown() {
28 pool.close()
29 }
Roman Elizarov656bcd92017-06-07 09:24:19 +030030
Roman Elizarove6e8ce82017-06-05 17:04:39 +030031 @Test
32 fun testStressClose() = runBlocking<Unit> {
Roman Elizarovd3d335b2017-10-21 17:43:53 +030033 println("--- ConflatedChannelCloseStressTest with nSenders=$nSenders")
Roman Elizarove6e8ce82017-06-05 17:04:39 +030034 val senderJobs = List(nSenders) { Job() }
35 val senders = List(nSenders) { senderId ->
Roman Elizarov656bcd92017-06-07 09:24:19 +030036 launch(pool) {
Roman Elizarove6e8ce82017-06-05 17:04:39 +030037 var x = senderId
38 try {
39 while (isActive) {
40 try {
41 curChannel.get().offer(x)
42 x += nSenders
43 sent.incrementAndGet()
44 } catch (e: ClosedSendChannelException) {
45 // ignore
46 }
47 }
48 } finally {
49 senderJobs[senderId].cancel()
50 }
51 }
52 }
53 val closerJob = Job()
Roman Elizarov656bcd92017-06-07 09:24:19 +030054 val closer = launch(pool) {
Roman Elizarove6e8ce82017-06-05 17:04:39 +030055 try {
56 while (isActive) {
57 flipChannel()
58 closed.incrementAndGet()
59 yield()
60 }
61 } finally {
62 closerJob.cancel()
63 }
64 }
Roman Elizarov656bcd92017-06-07 09:24:19 +030065 val receiver = async(pool) {
Roman Elizarove6e8ce82017-06-05 17:04:39 +030066 while (isActive) {
67 curChannel.get().receiveOrNull()
68 received.incrementAndGet()
69 }
70 }
71 // print stats while running
72 repeat(testSeconds) {
73 delay(1000)
74 printStats()
75 }
76 println("Stopping")
77 senders.forEach { it.cancel() }
78 closer.cancel()
79 // wait them to complete
80 println("waiting for senders...")
81 senderJobs.forEach { it.join() }
82 println("waiting for closer...")
83 closerJob.join()
84 // close cur channel
85 println("Closing channel and signalling receiver...")
86 flipChannel()
87 curChannel.get().close(StopException())
88 /// wait for receiver do complete
89 println("Waiting for receiver...")
90 try {
91 receiver.await()
Roman Elizarov448528e2017-07-21 18:11:20 +030092 error("Receiver should not complete normally")
Roman Elizarove6e8ce82017-06-05 17:04:39 +030093 } catch (e: StopException) {
94 // ok
95 }
96 // print stats
97 println("--- done")
98 printStats()
99 }
100
101 private fun flipChannel() {
102 val oldChannel = curChannel.get()
103 val newChannel = ConflatedChannel<Int>()
104 curChannel.set(newChannel)
Roman Elizarov448528e2017-07-21 18:11:20 +0300105 check(oldChannel.close())
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300106 }
107
108 private fun printStats() {
109 println("sent ${sent.get()}, closed ${closed.get()}, received ${received.get()}")
110 }
111
112 class StopException : Exception()
113}