blob: 71da1e4b55aed0dfdf1590287076ac39aa65208d [file] [log] [blame]
Roman Elizarovf16fd272017-02-07 11:26:00 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Roman Elizarov187eace2017-01-31 09:39:58 +030017package kotlinx.coroutines.experimental.channels
18
19import kotlinx.coroutines.experimental.*
Roman Elizarov1216e912017-02-22 09:57:06 +030020import kotlinx.coroutines.experimental.selects.select
21import org.junit.Assert.assertEquals
Roman Elizarov187eace2017-01-31 09:39:58 +030022import org.junit.Test
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030023import org.junit.runner.RunWith
24import org.junit.runners.Parameterized
Roman Elizarov187eace2017-01-31 09:39:58 +030025import java.util.*
Roman Elizarov1216e912017-02-22 09:57:06 +030026import java.util.concurrent.atomic.AtomicReference
Roman Elizarov187eace2017-01-31 09:39:58 +030027
Roman Elizarov1216e912017-02-22 09:57:06 +030028/**
29 * Tests cancel atomicity for channel send & receive operations, including their select versions.
30 */
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030031@RunWith(Parameterized::class)
Roman Elizarovebe18b42017-02-28 17:50:55 +030032class ChannelAtomicCancelStressTest(val kind: TestChannelKind) : TestBase() {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030033 companion object {
34 @Parameterized.Parameters(name = "{0}")
35 @JvmStatic
36 fun params(): Collection<Array<Any>> = TestChannelKind.values().map { arrayOf<Any>(it) }
37 }
38
Roman Elizarovebe18b42017-02-28 17:50:55 +030039 val TEST_DURATION = 3000L * stressTestMultiplier
Roman Elizarov187eace2017-01-31 09:39:58 +030040
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030041 val channel = kind.create()
Roman Elizarov1216e912017-02-22 09:57:06 +030042 val senderDone = ArrayChannel<Boolean>(1)
43 val receiverDone = ArrayChannel<Boolean>(1)
Roman Elizarov187eace2017-01-31 09:39:58 +030044
45 var lastSent = 0
46 var lastReceived = 0
47
48 var stoppedSender = 0
49 var stoppedReceiver = 0
50
51 var missedCnt = 0
52 var dupCnt = 0
53
Roman Elizarov1216e912017-02-22 09:57:06 +030054 val failed = AtomicReference<Throwable>()
55
Roman Elizarov187eace2017-01-31 09:39:58 +030056 lateinit var sender: Job
57 lateinit var receiver: Job
58
Roman Elizarov1216e912017-02-22 09:57:06 +030059 fun fail(e: Throwable) = failed.compareAndSet(null, e)
60
61 inline fun cancellable(done: ArrayChannel<Boolean>, block: () -> Unit) {
62 try {
63 block()
64 } catch (e: Throwable) {
65 if (e !is CancellationException) fail(e)
66 throw e
67 } finally {
68 if (!done.offer(true))
69 fail(IllegalStateException("failed to offer to done channel"))
70 }
71 }
72
Roman Elizarov187eace2017-01-31 09:39:58 +030073 @Test
Roman Elizarov1216e912017-02-22 09:57:06 +030074 fun testAtomicCancelStress() = runBlocking<Unit> {
Roman Elizarovd3d335b2017-10-21 17:43:53 +030075 println("--- ChannelAtomicCancelStressTest $kind")
Roman Elizarov187eace2017-01-31 09:39:58 +030076 val deadline = System.currentTimeMillis() + TEST_DURATION
77 launchSender()
78 launchReceiver()
79 val rnd = Random()
Roman Elizarov1216e912017-02-22 09:57:06 +030080 while (System.currentTimeMillis() < deadline && failed.get() == null) {
Roman Elizarov6c63aea2017-02-02 12:20:48 +030081 when (rnd.nextInt(3)) {
Roman Elizarov187eace2017-01-31 09:39:58 +030082 0 -> { // cancel & restart sender
83 stopSender()
84 launchSender()
85 }
Roman Elizarov1216e912017-02-22 09:57:06 +030086 1 -> { // cancel & restart receiver
87 stopReceiver()
Roman Elizarov187eace2017-01-31 09:39:58 +030088 launchReceiver()
89 }
90 2 -> yield() // just yield (burn a little time)
Roman Elizarov187eace2017-01-31 09:39:58 +030091 }
92 }
93 stopSender()
Roman Elizarov1216e912017-02-22 09:57:06 +030094 stopReceiver()
Roman Elizarov187eace2017-01-31 09:39:58 +030095 println(" Sent $lastSent ints to channel")
96 println(" Received $lastReceived ints from channel")
97 println(" Stopped sender $stoppedSender times")
98 println("Stopped receiver $stoppedReceiver times")
99 println(" Missed $missedCnt ints")
100 println(" Duplicated $dupCnt ints")
Roman Elizarov1216e912017-02-22 09:57:06 +0300101 failed.get()?.let { throw it }
Roman Elizarov187eace2017-01-31 09:39:58 +0300102 assertEquals(0, dupCnt)
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300103 if (!kind.isConflated) {
Roman Elizarov3aed4ee2017-03-06 12:21:05 +0300104 assertEquals(0, missedCnt)
105 assertEquals(lastSent, lastReceived)
106 }
Roman Elizarov187eace2017-01-31 09:39:58 +0300107 }
108
109 fun launchSender() {
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300110 sender = launch(CommonPool, start = CoroutineStart.ATOMIC) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300111 val rnd = Random()
112 cancellable(senderDone) {
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300113 var counter = 0
Roman Elizarov187eace2017-01-31 09:39:58 +0300114 while (true) {
115 val trySend = lastSent + 1
Roman Elizarov1216e912017-02-22 09:57:06 +0300116 when (rnd.nextInt(2)) {
117 0 -> channel.send(trySend)
118 1 -> select { channel.onSend(trySend) {} }
119 else -> error("cannot happen")
120 }
Roman Elizarov187eace2017-01-31 09:39:58 +0300121 lastSent = trySend // update on success
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300122 if (counter++ % 1000 == 0) yield() // yield periodically to check cancellation on LinkedListChannel
Roman Elizarov187eace2017-01-31 09:39:58 +0300123 }
Roman Elizarov187eace2017-01-31 09:39:58 +0300124 }
125 }
126 }
127
128 suspend fun stopSender() {
129 stoppedSender++
130 sender.cancel()
131 senderDone.receive()
132 }
133
134 fun launchReceiver() {
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300135 receiver = launch(CommonPool, start = CoroutineStart.ATOMIC) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300136 val rnd = Random()
137 cancellable(receiverDone) {
Roman Elizarov187eace2017-01-31 09:39:58 +0300138 while (true) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300139 val received = when (rnd.nextInt(2)) {
140 0 -> channel.receive()
141 1 -> select { channel.onReceive { it } }
142 else -> error("cannot happen")
143 }
Roman Elizarov187eace2017-01-31 09:39:58 +0300144 val expected = lastReceived + 1
145 if (received > expected)
146 missedCnt++
147 if (received < expected)
148 dupCnt++
149 lastReceived = received
150 }
Roman Elizarov187eace2017-01-31 09:39:58 +0300151 }
152 }
153 }
154
Roman Elizarov1216e912017-02-22 09:57:06 +0300155 suspend fun stopReceiver() {
Roman Elizarov187eace2017-01-31 09:39:58 +0300156 stoppedReceiver++
157 receiver.cancel()
158 receiverDone.receive()
159 }
160}