blob: 795edcbe9bd4a8192608cb1cf16c2d082eefe2f6 [file] [log] [blame]
Roman Elizarovc22a1c72018-05-30 21:58:02 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarovc22a1c72018-05-30 21:58:02 +03003 */
4
5package kotlinx.coroutines.experimental.internal
6
7import kotlinx.atomicfu.*
8import kotlinx.coroutines.experimental.*
9import java.util.concurrent.*
10import kotlin.concurrent.*
11import kotlin.test.*
12
13// Tests many short queues to stress copy/resize
14class LockFreeMPSCQueueStressTest : TestBase() {
15 private val nSeconds = 3 * stressTestMultiplier
16 private val nProducers = 4
17 private val batchSize = 100
18
19 private val batch = atomic(0)
20 private val produced = atomic(0L)
21 private val consumed = atomic(0L)
22 private var expected = LongArray(nProducers)
23
24 private var queue = atomic<LockFreeMPSCQueue<Item>?>(null)
25 private val done = atomic(0)
26 private val doneProducers = atomic(0)
27
28 private val barrier = CyclicBarrier(nProducers + 2)
29
30 private class Item(val producer: Int, val index: Long)
31
32 @Test
33 fun testStress() {
34 val threads = mutableListOf<Thread>()
35 threads += thread(name = "Pacer", start = false) {
36 while (done.value == 0) {
37 queue.value = LockFreeMPSCQueue()
38 batch.value = 0
39 doneProducers.value = 0
40 barrier.await() // start consumers & producers
41 barrier.await() // await consumers & producers
42 }
43 queue.value = null
44 println("Pacer done")
45 barrier.await() // wakeup the rest
46 }
47 threads += thread(name = "Consumer", start = false) {
48 while (true) {
49 barrier.await()
50 val queue = queue.value ?: break
51 while (true) {
52 val item = queue.removeFirstOrNull()
53 if (item == null) {
54 if (doneProducers.value == nProducers && queue.isEmpty) break // that's it
55 continue // spin to retry
56 }
57 consumed.incrementAndGet()
58 val eItem = expected[item.producer]++
59 if (eItem != item.index) error("Expected $eItem but got ${item.index} from Producer-${item.producer}")
60 }
61 barrier.await()
62 }
63 println("Consumer done")
64 }
65 val producers = List(nProducers) { producer ->
66 thread(name = "Producer-$producer", start = false) {
67 var index = 0L
68 while (true) {
69 barrier.await()
70 val queue = queue.value ?: break
71 while (true) {
72 if (batch.incrementAndGet() >= batchSize) break
73 check(queue.addLast(Item(producer, index++))) // never closed
74 produced.incrementAndGet()
75 }
76 doneProducers.incrementAndGet()
77 barrier.await()
78 }
79 println("Producer-$producer done")
80 }
81 }
82 threads += producers
83 threads.forEach {
84 it.setUncaughtExceptionHandler { t, e ->
85 System.err.println("Thread $t failed: $e")
86 e.printStackTrace()
87 done.value = 1
88 error("Thread $t failed", e)
89 }
90 }
91 threads.forEach { it.start() }
92 for (second in 1..nSeconds) {
93 Thread.sleep(1000)
94 println("$second: produced=${produced.value}, consumed=${consumed.value}")
95 if (done.value == 1) break
96 }
97 done.value = 1
98 threads.forEach { it.join() }
99 println("T: produced=${produced.value}, consumed=${consumed.value}")
100 assertEquals(produced.value, consumed.value)
101 }
102}