Roman Elizarov | f16fd27 | 2017-02-07 11:26:00 +0300 | [diff] [blame] | 1 | /* |
Roman Elizarov | 1f74a2d | 2018-06-29 19:19:45 +0300 | [diff] [blame] | 2 | * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
Roman Elizarov | f16fd27 | 2017-02-07 11:26:00 +0300 | [diff] [blame] | 3 | */ |
| 4 | |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 5 | package kotlinx.coroutines.channels |
Roman Elizarov | 187eace | 2017-01-31 09:39:58 +0300 | [diff] [blame] | 6 | |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 7 | import kotlinx.coroutines.* |
| 8 | import kotlinx.coroutines.selects.* |
Roman Elizarov | c32579e | 2018-09-09 19:21:43 +0300 | [diff] [blame] | 9 | import org.junit.* |
| 10 | import org.junit.Assert.* |
| 11 | import org.junit.runner.* |
| 12 | import org.junit.runners.* |
Roman Elizarov | 187eace | 2017-01-31 09:39:58 +0300 | [diff] [blame] | 13 | import java.util.* |
Roman Elizarov | c32579e | 2018-09-09 19:21:43 +0300 | [diff] [blame] | 14 | import java.util.concurrent.atomic.* |
Roman Elizarov | 187eace | 2017-01-31 09:39:58 +0300 | [diff] [blame] | 15 | |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 16 | /** |
| 17 | * Tests cancel atomicity for channel send & receive operations, including their select versions. |
| 18 | */ |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 19 | @RunWith(Parameterized::class) |
Vsevolod Tolstopyatov | a2d8088 | 2018-09-24 19:51:49 +0300 | [diff] [blame] | 20 | class ChannelAtomicCancelStressTest(private val kind: TestChannelKind) : TestBase() { |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 21 | companion object { |
| 22 | @Parameterized.Parameters(name = "{0}") |
| 23 | @JvmStatic |
| 24 | fun params(): Collection<Array<Any>> = TestChannelKind.values().map { arrayOf<Any>(it) } |
| 25 | } |
| 26 | |
Roman Elizarov | 9b6e311 | 2019-04-21 17:17:06 +0300 | [diff] [blame] | 27 | private val TEST_DURATION = 1000L * stressTestMultiplier |
Roman Elizarov | 187eace | 2017-01-31 09:39:58 +0300 | [diff] [blame] | 28 | |
Roman Elizarov | 9b6e311 | 2019-04-21 17:17:06 +0300 | [diff] [blame] | 29 | private val dispatcher = newFixedThreadPoolContext(2, "ChannelAtomicCancelStressTest") |
| 30 | private val scope = CoroutineScope(dispatcher) |
| 31 | |
| 32 | private val channel = kind.create() |
Vsevolod Tolstopyatov | a2d8088 | 2018-09-24 19:51:49 +0300 | [diff] [blame] | 33 | private val senderDone = Channel<Boolean>(1) |
| 34 | private val receiverDone = Channel<Boolean>(1) |
Roman Elizarov | 187eace | 2017-01-31 09:39:58 +0300 | [diff] [blame] | 35 | |
Vsevolod Tolstopyatov | a2d8088 | 2018-09-24 19:51:49 +0300 | [diff] [blame] | 36 | private var lastSent = 0 |
| 37 | private var lastReceived = 0 |
Roman Elizarov | 187eace | 2017-01-31 09:39:58 +0300 | [diff] [blame] | 38 | |
Vsevolod Tolstopyatov | a2d8088 | 2018-09-24 19:51:49 +0300 | [diff] [blame] | 39 | private var stoppedSender = 0 |
| 40 | private var stoppedReceiver = 0 |
Roman Elizarov | 187eace | 2017-01-31 09:39:58 +0300 | [diff] [blame] | 41 | |
Vsevolod Tolstopyatov | a2d8088 | 2018-09-24 19:51:49 +0300 | [diff] [blame] | 42 | private var missedCnt = 0 |
| 43 | private var dupCnt = 0 |
Roman Elizarov | 187eace | 2017-01-31 09:39:58 +0300 | [diff] [blame] | 44 | |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 45 | val failed = AtomicReference<Throwable>() |
| 46 | |
Roman Elizarov | 187eace | 2017-01-31 09:39:58 +0300 | [diff] [blame] | 47 | lateinit var sender: Job |
| 48 | lateinit var receiver: Job |
| 49 | |
Roman Elizarov | 9b6e311 | 2019-04-21 17:17:06 +0300 | [diff] [blame] | 50 | @After |
| 51 | fun tearDown() { |
| 52 | dispatcher.close() |
| 53 | } |
| 54 | |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 55 | fun fail(e: Throwable) = failed.compareAndSet(null, e) |
| 56 | |
Vsevolod Tolstopyatov | a2d8088 | 2018-09-24 19:51:49 +0300 | [diff] [blame] | 57 | private inline fun cancellable(done: Channel<Boolean>, block: () -> Unit) { |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 58 | try { |
| 59 | block() |
| 60 | } catch (e: Throwable) { |
| 61 | if (e !is CancellationException) fail(e) |
| 62 | throw e |
| 63 | } finally { |
| 64 | if (!done.offer(true)) |
| 65 | fail(IllegalStateException("failed to offer to done channel")) |
| 66 | } |
| 67 | } |
| 68 | |
Roman Elizarov | 187eace | 2017-01-31 09:39:58 +0300 | [diff] [blame] | 69 | @Test |
Vsevolod Tolstopyatov | a2d8088 | 2018-09-24 19:51:49 +0300 | [diff] [blame] | 70 | fun testAtomicCancelStress() = runBlocking { |
Roman Elizarov | d3d335b | 2017-10-21 17:43:53 +0300 | [diff] [blame] | 71 | println("--- ChannelAtomicCancelStressTest $kind") |
Roman Elizarov | 187eace | 2017-01-31 09:39:58 +0300 | [diff] [blame] | 72 | val deadline = System.currentTimeMillis() + TEST_DURATION |
| 73 | launchSender() |
| 74 | launchReceiver() |
| 75 | val rnd = Random() |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 76 | while (System.currentTimeMillis() < deadline && failed.get() == null) { |
Roman Elizarov | 6c63aea | 2017-02-02 12:20:48 +0300 | [diff] [blame] | 77 | when (rnd.nextInt(3)) { |
Roman Elizarov | 187eace | 2017-01-31 09:39:58 +0300 | [diff] [blame] | 78 | 0 -> { // cancel & restart sender |
| 79 | stopSender() |
| 80 | launchSender() |
| 81 | } |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 82 | 1 -> { // cancel & restart receiver |
| 83 | stopReceiver() |
Roman Elizarov | 187eace | 2017-01-31 09:39:58 +0300 | [diff] [blame] | 84 | launchReceiver() |
| 85 | } |
| 86 | 2 -> yield() // just yield (burn a little time) |
Roman Elizarov | 187eace | 2017-01-31 09:39:58 +0300 | [diff] [blame] | 87 | } |
| 88 | } |
| 89 | stopSender() |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 90 | stopReceiver() |
Roman Elizarov | 187eace | 2017-01-31 09:39:58 +0300 | [diff] [blame] | 91 | println(" Sent $lastSent ints to channel") |
| 92 | println(" Received $lastReceived ints from channel") |
| 93 | println(" Stopped sender $stoppedSender times") |
| 94 | println("Stopped receiver $stoppedReceiver times") |
| 95 | println(" Missed $missedCnt ints") |
| 96 | println(" Duplicated $dupCnt ints") |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 97 | failed.get()?.let { throw it } |
Roman Elizarov | 187eace | 2017-01-31 09:39:58 +0300 | [diff] [blame] | 98 | assertEquals(0, dupCnt) |
Roman Elizarov | e3aa8ff | 2017-04-27 19:16:40 +0300 | [diff] [blame] | 99 | if (!kind.isConflated) { |
Roman Elizarov | 3aed4ee | 2017-03-06 12:21:05 +0300 | [diff] [blame] | 100 | assertEquals(0, missedCnt) |
| 101 | assertEquals(lastSent, lastReceived) |
| 102 | } |
Roman Elizarov | 187eace | 2017-01-31 09:39:58 +0300 | [diff] [blame] | 103 | } |
| 104 | |
Vsevolod Tolstopyatov | a2d8088 | 2018-09-24 19:51:49 +0300 | [diff] [blame] | 105 | private fun launchSender() { |
Roman Elizarov | 9b6e311 | 2019-04-21 17:17:06 +0300 | [diff] [blame] | 106 | sender = scope.launch(start = CoroutineStart.ATOMIC) { |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 107 | val rnd = Random() |
| 108 | cancellable(senderDone) { |
Roman Elizarov | 2ad0e94 | 2017-02-28 19:14:08 +0300 | [diff] [blame] | 109 | var counter = 0 |
Roman Elizarov | 187eace | 2017-01-31 09:39:58 +0300 | [diff] [blame] | 110 | while (true) { |
| 111 | val trySend = lastSent + 1 |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 112 | when (rnd.nextInt(2)) { |
| 113 | 0 -> channel.send(trySend) |
| 114 | 1 -> select { channel.onSend(trySend) {} } |
| 115 | else -> error("cannot happen") |
| 116 | } |
Roman Elizarov | 187eace | 2017-01-31 09:39:58 +0300 | [diff] [blame] | 117 | lastSent = trySend // update on success |
Roman Elizarov | 8009b36 | 2019-04-21 18:29:39 +0300 | [diff] [blame^] | 118 | when { |
| 119 | // must artificially slow down LINKED_LIST sender to avoid overwhelming receiver and going OOM |
| 120 | kind == TestChannelKind.LINKED_LIST -> while (lastSent > lastReceived + 100) yield() |
| 121 | // yield periodically to check cancellation on conflated channels |
| 122 | kind.isConflated -> if (counter++ % 100 == 0) yield() |
| 123 | } |
Roman Elizarov | 187eace | 2017-01-31 09:39:58 +0300 | [diff] [blame] | 124 | } |
Roman Elizarov | 187eace | 2017-01-31 09:39:58 +0300 | [diff] [blame] | 125 | } |
| 126 | } |
| 127 | } |
| 128 | |
Vsevolod Tolstopyatov | a2d8088 | 2018-09-24 19:51:49 +0300 | [diff] [blame] | 129 | private suspend fun stopSender() { |
Roman Elizarov | 187eace | 2017-01-31 09:39:58 +0300 | [diff] [blame] | 130 | stoppedSender++ |
| 131 | sender.cancel() |
| 132 | senderDone.receive() |
| 133 | } |
| 134 | |
Vsevolod Tolstopyatov | a2d8088 | 2018-09-24 19:51:49 +0300 | [diff] [blame] | 135 | private fun launchReceiver() { |
Roman Elizarov | 9b6e311 | 2019-04-21 17:17:06 +0300 | [diff] [blame] | 136 | receiver = scope.launch(start = CoroutineStart.ATOMIC) { |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 137 | val rnd = Random() |
| 138 | cancellable(receiverDone) { |
Roman Elizarov | 187eace | 2017-01-31 09:39:58 +0300 | [diff] [blame] | 139 | while (true) { |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 140 | val received = when (rnd.nextInt(2)) { |
| 141 | 0 -> channel.receive() |
| 142 | 1 -> select { channel.onReceive { it } } |
| 143 | else -> error("cannot happen") |
| 144 | } |
Roman Elizarov | 187eace | 2017-01-31 09:39:58 +0300 | [diff] [blame] | 145 | val expected = lastReceived + 1 |
| 146 | if (received > expected) |
| 147 | missedCnt++ |
| 148 | if (received < expected) |
| 149 | dupCnt++ |
| 150 | lastReceived = received |
| 151 | } |
Roman Elizarov | 187eace | 2017-01-31 09:39:58 +0300 | [diff] [blame] | 152 | } |
| 153 | } |
| 154 | } |
| 155 | |
Vsevolod Tolstopyatov | a2d8088 | 2018-09-24 19:51:49 +0300 | [diff] [blame] | 156 | private suspend fun stopReceiver() { |
Roman Elizarov | 187eace | 2017-01-31 09:39:58 +0300 | [diff] [blame] | 157 | stoppedReceiver++ |
| 158 | receiver.cancel() |
| 159 | receiverDone.receive() |
| 160 | } |
| 161 | } |