blob: 6223213d67fe8549958ea66a81ca1256f23d4fdd [file] [log] [blame]
Roman Elizarovf16fd272017-02-07 11:26:00 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarovf16fd272017-02-07 11:26:00 +03003 */
4
Roman Elizarov0950dfa2018-07-13 10:33:25 +03005package kotlinx.coroutines.channels
Roman Elizarov187eace2017-01-31 09:39:58 +03006
Roman Elizarov0950dfa2018-07-13 10:33:25 +03007import kotlinx.coroutines.*
8import kotlinx.coroutines.selects.*
Roman Elizarovc32579e2018-09-09 19:21:43 +03009import org.junit.*
10import org.junit.Assert.*
11import org.junit.runner.*
12import org.junit.runners.*
Roman Elizarov187eace2017-01-31 09:39:58 +030013import java.util.*
Roman Elizarovc32579e2018-09-09 19:21:43 +030014import java.util.concurrent.atomic.*
Roman Elizarov187eace2017-01-31 09:39:58 +030015
Roman Elizarov1216e912017-02-22 09:57:06 +030016/**
17 * Tests cancel atomicity for channel send & receive operations, including their select versions.
18 */
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030019@RunWith(Parameterized::class)
Vsevolod Tolstopyatova2d80882018-09-24 19:51:49 +030020class ChannelAtomicCancelStressTest(private val kind: TestChannelKind) : TestBase() {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030021 companion object {
22 @Parameterized.Parameters(name = "{0}")
23 @JvmStatic
24 fun params(): Collection<Array<Any>> = TestChannelKind.values().map { arrayOf<Any>(it) }
25 }
26
Roman Elizarov9b6e3112019-04-21 17:17:06 +030027 private val TEST_DURATION = 1000L * stressTestMultiplier
Roman Elizarov187eace2017-01-31 09:39:58 +030028
Roman Elizarov9b6e3112019-04-21 17:17:06 +030029 private val dispatcher = newFixedThreadPoolContext(2, "ChannelAtomicCancelStressTest")
30 private val scope = CoroutineScope(dispatcher)
31
32 private val channel = kind.create()
Vsevolod Tolstopyatova2d80882018-09-24 19:51:49 +030033 private val senderDone = Channel<Boolean>(1)
34 private val receiverDone = Channel<Boolean>(1)
Roman Elizarov187eace2017-01-31 09:39:58 +030035
Vsevolod Tolstopyatova2d80882018-09-24 19:51:49 +030036 private var lastSent = 0
37 private var lastReceived = 0
Roman Elizarov187eace2017-01-31 09:39:58 +030038
Vsevolod Tolstopyatova2d80882018-09-24 19:51:49 +030039 private var stoppedSender = 0
40 private var stoppedReceiver = 0
Roman Elizarov187eace2017-01-31 09:39:58 +030041
Vsevolod Tolstopyatova2d80882018-09-24 19:51:49 +030042 private var missedCnt = 0
43 private var dupCnt = 0
Roman Elizarov187eace2017-01-31 09:39:58 +030044
Roman Elizarov1216e912017-02-22 09:57:06 +030045 val failed = AtomicReference<Throwable>()
46
Roman Elizarov187eace2017-01-31 09:39:58 +030047 lateinit var sender: Job
48 lateinit var receiver: Job
49
Roman Elizarov9b6e3112019-04-21 17:17:06 +030050 @After
51 fun tearDown() {
52 dispatcher.close()
53 }
54
Roman Elizarov1216e912017-02-22 09:57:06 +030055 fun fail(e: Throwable) = failed.compareAndSet(null, e)
56
Vsevolod Tolstopyatova2d80882018-09-24 19:51:49 +030057 private inline fun cancellable(done: Channel<Boolean>, block: () -> Unit) {
Roman Elizarov1216e912017-02-22 09:57:06 +030058 try {
59 block()
60 } catch (e: Throwable) {
61 if (e !is CancellationException) fail(e)
62 throw e
63 } finally {
64 if (!done.offer(true))
65 fail(IllegalStateException("failed to offer to done channel"))
66 }
67 }
68
Roman Elizarov187eace2017-01-31 09:39:58 +030069 @Test
Vsevolod Tolstopyatova2d80882018-09-24 19:51:49 +030070 fun testAtomicCancelStress() = runBlocking {
Roman Elizarovd3d335b2017-10-21 17:43:53 +030071 println("--- ChannelAtomicCancelStressTest $kind")
Roman Elizarov187eace2017-01-31 09:39:58 +030072 val deadline = System.currentTimeMillis() + TEST_DURATION
73 launchSender()
74 launchReceiver()
75 val rnd = Random()
Roman Elizarov1216e912017-02-22 09:57:06 +030076 while (System.currentTimeMillis() < deadline && failed.get() == null) {
Roman Elizarov6c63aea2017-02-02 12:20:48 +030077 when (rnd.nextInt(3)) {
Roman Elizarov187eace2017-01-31 09:39:58 +030078 0 -> { // cancel & restart sender
79 stopSender()
80 launchSender()
81 }
Roman Elizarov1216e912017-02-22 09:57:06 +030082 1 -> { // cancel & restart receiver
83 stopReceiver()
Roman Elizarov187eace2017-01-31 09:39:58 +030084 launchReceiver()
85 }
86 2 -> yield() // just yield (burn a little time)
Roman Elizarov187eace2017-01-31 09:39:58 +030087 }
88 }
89 stopSender()
Roman Elizarov1216e912017-02-22 09:57:06 +030090 stopReceiver()
Roman Elizarov187eace2017-01-31 09:39:58 +030091 println(" Sent $lastSent ints to channel")
92 println(" Received $lastReceived ints from channel")
93 println(" Stopped sender $stoppedSender times")
94 println("Stopped receiver $stoppedReceiver times")
95 println(" Missed $missedCnt ints")
96 println(" Duplicated $dupCnt ints")
Roman Elizarov1216e912017-02-22 09:57:06 +030097 failed.get()?.let { throw it }
Roman Elizarov187eace2017-01-31 09:39:58 +030098 assertEquals(0, dupCnt)
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030099 if (!kind.isConflated) {
Roman Elizarov3aed4ee2017-03-06 12:21:05 +0300100 assertEquals(0, missedCnt)
101 assertEquals(lastSent, lastReceived)
102 }
Roman Elizarov187eace2017-01-31 09:39:58 +0300103 }
104
Vsevolod Tolstopyatova2d80882018-09-24 19:51:49 +0300105 private fun launchSender() {
Roman Elizarov9b6e3112019-04-21 17:17:06 +0300106 sender = scope.launch(start = CoroutineStart.ATOMIC) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300107 val rnd = Random()
108 cancellable(senderDone) {
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300109 var counter = 0
Roman Elizarov187eace2017-01-31 09:39:58 +0300110 while (true) {
111 val trySend = lastSent + 1
Roman Elizarov1216e912017-02-22 09:57:06 +0300112 when (rnd.nextInt(2)) {
113 0 -> channel.send(trySend)
114 1 -> select { channel.onSend(trySend) {} }
115 else -> error("cannot happen")
116 }
Roman Elizarov187eace2017-01-31 09:39:58 +0300117 lastSent = trySend // update on success
Roman Elizarov8009b362019-04-21 18:29:39 +0300118 when {
119 // must artificially slow down LINKED_LIST sender to avoid overwhelming receiver and going OOM
120 kind == TestChannelKind.LINKED_LIST -> while (lastSent > lastReceived + 100) yield()
121 // yield periodically to check cancellation on conflated channels
122 kind.isConflated -> if (counter++ % 100 == 0) yield()
123 }
Roman Elizarov187eace2017-01-31 09:39:58 +0300124 }
Roman Elizarov187eace2017-01-31 09:39:58 +0300125 }
126 }
127 }
128
Vsevolod Tolstopyatova2d80882018-09-24 19:51:49 +0300129 private suspend fun stopSender() {
Roman Elizarov187eace2017-01-31 09:39:58 +0300130 stoppedSender++
131 sender.cancel()
132 senderDone.receive()
133 }
134
Vsevolod Tolstopyatova2d80882018-09-24 19:51:49 +0300135 private fun launchReceiver() {
Roman Elizarov9b6e3112019-04-21 17:17:06 +0300136 receiver = scope.launch(start = CoroutineStart.ATOMIC) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300137 val rnd = Random()
138 cancellable(receiverDone) {
Roman Elizarov187eace2017-01-31 09:39:58 +0300139 while (true) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300140 val received = when (rnd.nextInt(2)) {
141 0 -> channel.receive()
142 1 -> select { channel.onReceive { it } }
143 else -> error("cannot happen")
144 }
Roman Elizarov187eace2017-01-31 09:39:58 +0300145 val expected = lastReceived + 1
146 if (received > expected)
147 missedCnt++
148 if (received < expected)
149 dupCnt++
150 lastReceived = received
151 }
Roman Elizarov187eace2017-01-31 09:39:58 +0300152 }
153 }
154 }
155
Vsevolod Tolstopyatova2d80882018-09-24 19:51:49 +0300156 private suspend fun stopReceiver() {
Roman Elizarov187eace2017-01-31 09:39:58 +0300157 stoppedReceiver++
158 receiver.cancel()
159 receiverDone.receive()
160 }
161}