Fixed bug in LockFreeLinkedList (invariant & lock-freedom violation)
Migrated atomic ops to AtomicFU library for ease of code maintenance
Implement lock-freedom testing in LockFreeLinkedListAtomicStressLFTest
diff --git a/kotlinx-coroutines-core/pom.xml b/kotlinx-coroutines-core/pom.xml
index bb118e4..169cbd6 100644
--- a/kotlinx-coroutines-core/pom.xml
+++ b/kotlinx-coroutines-core/pom.xml
@@ -47,14 +47,55 @@
<version>1.8.1</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.jetbrains.kotlinx</groupId>
+ <artifactId>atomicfu</artifactId>
+ <version>${atomicfu.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<build>
<sourceDirectory>src/main/kotlin</sourceDirectory>
<testSourceDirectory>src/test/kotlin</testSourceDirectory>
-
<plugins>
+ <!-- compile Kotlin files to staging directory -->
+ <plugin>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-maven-plugin</artifactId>
+ <version>${kotlin.version}</version>
+ <executions>
+ <execution>
+ <id>compile</id>
+ <phase>compile</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <configuration>
+ <output>${project.build.directory}/classes-atomicfu</output>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <!-- transform classes with AtomicFU plugin -->
+ <plugin>
+ <groupId>org.jetbrains.kotlinx</groupId>
+ <artifactId>atomicfu-maven-plugin</artifactId>
+ <version>${atomicfu.version}</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>transform</goal>
+ </goals>
+ <configuration>
+ <verbose>true</verbose>
+ <input>${project.build.directory}/classes-atomicfu</input>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <!-- additional configuration for tests -->
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<executions>
@@ -62,16 +103,34 @@
<execution>
<id>jdk16-test</id>
<phase>test</phase>
- <goals><goal>test</goal></goals>
+ <goals>
+ <goal>test</goal>
+ </goals>
<configuration>
<forkMode>once</forkMode>
<jvm>${env.JDK_16}/bin/java</jvm>
<argLine>-ea -Xmx1g -Xms1g -Djava.security.manager=kotlinx.coroutines.experimental.TestSecurityManager</argLine>
<excludes>
<exclude>**/*LinearizabilityTest.*</exclude>
+ <exclude>**/*LFTest.java</exclude>
</excludes>
</configuration>
</execution>
+ <!-- test lock-freedom on un-processed files -->
+ <execution>
+ <id>lockfreedom-test</id>
+ <phase>test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <forkMode>once</forkMode>
+ <classesDirectory>${project.build.directory}/classes-atomicfu</classesDirectory>
+ <includes>
+ <include>**/*LFTest.java</include>
+ </includes>
+ </configuration>
+ </execution>
</executions>
</plugin>
<plugin>
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt
index 1f2dcd1..2992708 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt
@@ -16,7 +16,8 @@
package kotlinx.coroutines.experimental.internal
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
+import kotlinx.atomicfu.atomic
+import kotlinx.atomicfu.loop
private typealias Node = LockFreeLinkedListNode
@@ -67,27 +68,12 @@
*/
@Suppress("LeakingThis")
public open class LockFreeLinkedListNode {
- @Volatile
- private var _next: Any = this // Node | Removed | OpDescriptor
- @Volatile
- private var _prev: Any = this // Node | Removed
- @Volatile
- private var _removedRef: Removed? = null // lazily cached removed ref to this
-
- private companion object {
- @JvmField
- val NEXT: AtomicReferenceFieldUpdater<Node, Any> =
- AtomicReferenceFieldUpdater.newUpdater(Node::class.java, Any::class.java, "_next")
- @JvmField
- val PREV: AtomicReferenceFieldUpdater<Node, Any> =
- AtomicReferenceFieldUpdater.newUpdater(Node::class.java, Any::class.java, "_prev")
- @JvmField
- val REMOVED_REF: AtomicReferenceFieldUpdater<Node, Removed?> =
- AtomicReferenceFieldUpdater.newUpdater(Node::class.java, Removed::class.java, "_removedRef")
- }
+ private val _next = atomic<Any>(this) // Node | Removed | OpDescriptor
+ private val _prev = atomic<Any>(this) // Node | Removed
+ private val _removedRef = atomic<Removed?>(null) // lazily cached removed ref to this
private fun removed(): Removed =
- _removedRef ?: Removed(this).also { REMOVED_REF.lazySet(this, it) }
+ _removedRef.value ?: Removed(this).also { _removedRef.lazySet(it) }
@PublishedApi
internal abstract class CondAddOp(
@@ -98,7 +84,7 @@
override fun complete(affected: Node, failure: Any?) {
val success = failure == null
val update = if (success) newNode else oldNext
- if (NEXT.compareAndSet(affected, this, update)) {
+ if (update != null && affected._next.compareAndSet( this, update)) {
// only the thread the makes this update actually finishes add operation
if (success) newNode.finishAdd(oldNext!!)
}
@@ -111,14 +97,13 @@
override fun prepare(affected: Node): Any? = if (condition()) null else CONDITION_FALSE
}
- public val isFresh: Boolean get() = _next === this
+ public val isFresh: Boolean get() = _next.value === this
public val isRemoved: Boolean get() = next is Removed
// LINEARIZABLE. Returns Node | Removed
public val next: Any get() {
- while (true) { // operation helper loop on _next
- val next = this._next
+ _next.loop { next ->
if (next !is OpDescriptor) return next
next.perform(this)
}
@@ -126,10 +111,9 @@
// LINEARIZABLE. Returns Node | Removed
public val prev: Any get() {
- while (true) { // insert helper loop on _prev
- val prev = this._prev
+ _prev.loop { prev ->
if (prev is Removed) return prev
- prev as Node // otherwise, it can be only node otherwise
+ prev as Node // otherwise, it can be only node
if (prev.next === this) return prev
correctPrev(prev, null)
}
@@ -138,12 +122,12 @@
// ------ addOneIfEmpty ------
public fun addOneIfEmpty(node: Node): Boolean {
- PREV.lazySet(node, this)
- NEXT.lazySet(node, this)
+ node._prev.lazySet(this)
+ node._next.lazySet(this)
while (true) {
val next = next
if (next !== this) return false // this is not an empty list!
- if (NEXT.compareAndSet(this, this, node)) {
+ if (_next.compareAndSet(this, node)) {
// added successfully (linearized add) -- fixup the list
node.finishAdd(this)
return true
@@ -207,9 +191,9 @@
@PublishedApi
internal fun addNext(node: Node, next: Node): Boolean {
- PREV.lazySet(node, this)
- NEXT.lazySet(node, next)
- if (!NEXT.compareAndSet(this, next, node)) return false
+ node._prev.lazySet(this)
+ node._next.lazySet(next)
+ if (!_next.compareAndSet(next, node)) return false
// added successfully (linearized add) -- fixup the list
node.finishAdd(next)
return true
@@ -218,10 +202,10 @@
// returns UNDECIDED, SUCCESS or FAILURE
@PublishedApi
internal fun tryCondAddNext(node: Node, next: Node, condAdd: CondAddOp): Int {
- PREV.lazySet(node, this)
- NEXT.lazySet(node, next)
+ node._prev.lazySet(this)
+ node._next.lazySet(next)
condAdd.oldNext = next
- if (!NEXT.compareAndSet(this, next, condAdd)) return UNDECIDED
+ if (!_next.compareAndSet(next, condAdd)) return UNDECIDED
// added operation successfully (linearized) -- complete it & fixup the list
return if (condAdd.perform(this) == null) SUCCESS else FAILURE
}
@@ -238,7 +222,7 @@
if (next is Removed) return false // was already removed -- don't try to help (original thread will take care)
if (next === this) return false // was not even added
val removed = (next as Node).removed()
- if (NEXT.compareAndSet(this, next, removed)) {
+ if (_next.compareAndSet(next, removed)) {
// was removed successfully (linearized remove) -- fixup the list
finishRemove(next)
return true
@@ -249,12 +233,16 @@
public open fun describeRemove() : AtomicDesc? {
if (isRemoved) return null // fast path if was already removed
return object : AbstractAtomicDesc() {
+ private val _originalNext = atomic<Node?>(null)
override val affectedNode: Node? get() = this@LockFreeLinkedListNode
- override var originalNext: Node? = null
+ override val originalNext get() = _originalNext.value
override fun failure(affected: Node, next: Any): Any? =
if (next is Removed) ALREADY_REMOVED else null
override fun onPrepare(affected: Node, next: Node): Any? {
- originalNext = next
+ // Note: onPrepare must use CAS to make sure the stale invocation is not
+ // going to overwrite the previous decision on successful preparation.
+ // Result of CAS is irrelevant, but we must ensure that it is set when invoker completes
+ _originalNext.compareAndSet(null, next)
return null // always success
}
override fun updatedNext(affected: Node, next: Node) = next.removed()
@@ -303,13 +291,13 @@
) : AbstractAtomicDesc() {
init {
// require freshly allocated node here
- check(node._next === node && node._prev === node)
+ check(node._next.value === node && node._prev.value === node)
}
final override fun takeAffectedNode(op: OpDescriptor): Node {
while (true) {
- val prev = queue._prev as Node // this sentinel node is never removed
- val next = prev._next
+ val prev = queue._prev.value as Node // this sentinel node is never removed
+ val next = prev._next.value
if (next === queue) return prev // all is good -> linked properly
if (next === op) return prev // all is good -> our operation descriptor is already there
if (next is OpDescriptor) { // some other operation descriptor -> help & retry
@@ -317,25 +305,31 @@
continue
}
// linked improperly -- help insert
- queue.correctPrev(prev, op)
+ val affected = queue.correctPrev(prev, op)
+ // we can find node which this operation is already affecting while trying to correct prev
+ if (affected != null) return affected
}
}
- final override var affectedNode: Node? = null
+ private val _affectedNode = atomic<Node?>(null)
+ final override val affectedNode: Node? get() = _affectedNode.value
final override val originalNext: Node? get() = queue
override fun retry(affected: Node, next: Any): Boolean = next !== queue
override fun onPrepare(affected: Node, next: Node): Any? {
- affectedNode = affected
+ // Note: onPrepare must use CAS to make sure the stale invocation is not
+ // going to overwrite the previous decision on successful preparation.
+ // Result of CAS is irrelevant, but we must ensure that it is set when invoker completes
+ _affectedNode.compareAndSet(null, affected)
return null // always success
}
override fun updatedNext(affected: Node, next: Node): Any {
// it is invoked only on successfully completion of operation, but this invocation can be stale,
// so we must use CAS to set both prev & next pointers
- PREV.compareAndSet(node, node, affected)
- NEXT.compareAndSet(node, node, queue)
+ node._prev.compareAndSet(node, affected)
+ node._next.compareAndSet(node, queue)
return node
}
@@ -347,12 +341,15 @@
public open class RemoveFirstDesc<T>(
@JvmField val queue: Node
) : AbstractAtomicDesc() {
+ private val _affectedNode = atomic<Node?>(null)
+ private val _originalNext = atomic<Node?>(null)
+
@Suppress("UNCHECKED_CAST")
public val result: T get() = affectedNode!! as T
final override fun takeAffectedNode(op: OpDescriptor): Node = queue.next as Node
- final override var affectedNode: Node? = null
- final override var originalNext: Node? = null
+ final override val affectedNode: Node? get() = _affectedNode.value
+ final override val originalNext: Node? get() = _originalNext.value
// check node predicates here, must signal failure if affect is not of type T
protected override fun failure(affected: Node, next: Any): Any? =
@@ -371,8 +368,11 @@
final override fun onPrepare(affected: Node, next: Node): Any? {
check(affected !is LockFreeLinkedListHead)
if (!validatePrepared(affected as T)) return REMOVE_PREPARED
- affectedNode = affected
- originalNext = next
+ // Note: onPrepare must use CAS to make sure the stale invocation is not
+ // going to overwrite the previous decision on successful preparation.
+ // Result of CAS is irrelevant, but we must ensure that it is set when invoker completes
+ _affectedNode.compareAndSet(null, affected)
+ _originalNext.compareAndSet(null, next)
return null // ok
}
@@ -404,21 +404,19 @@
if (decision === REMOVE_PREPARED) {
// remove element on failure
val removed = next.removed()
- if (NEXT.compareAndSet(affected, this, removed)) {
+ if (affected._next.compareAndSet(this, removed)) {
affected.helpDelete()
}
} else {
// some other failure -- mark as decided
op.tryDecide(decision)
// undo preparations
- NEXT.compareAndSet(affected, this, next)
+ affected._next.compareAndSet(this, next)
}
return decision
}
- check(desc.affectedNode === affected)
- check(desc.originalNext === next)
val update: Any = if (op.isDecided) next else op // restore if decision was already reached
- NEXT.compareAndSet(affected, this, update)
+ affected._next.compareAndSet(this, update)
return null // ok
}
}
@@ -428,10 +426,10 @@
while (true) { // lock free loop on next
val affected = takeAffectedNode(op)
// read its original next pointer first
- val next = affected._next
+ val next = affected._next.value
// then see if already reached consensus on overall operation
- if (op.isDecided) return null // already decided -- go to next desc
if (next === op) return null // already in process of operation -- all is good
+ if (op.isDecided) return null // already decided this operation -- go to next desc
if (next is OpDescriptor) {
// some other operation is in process -- help it
next.perform(affected)
@@ -442,7 +440,7 @@
if (failure != null) return failure // signal failure
if (retry(affected, next)) continue // retry operation
val prepareOp = PrepareOp(next as Node, op as AtomicOp<Node>, this)
- if (NEXT.compareAndSet(affected, next, prepareOp)) {
+ if (affected._next.compareAndSet(next, prepareOp)) {
// prepared -- complete preparations
val prepFail = prepareOp.perform(affected)
if (prepFail === REMOVE_PREPARED) continue // retry
@@ -454,9 +452,9 @@
final override fun complete(op: AtomicOp<*>, failure: Any?) {
val success = failure == null
val affectedNode = affectedNode ?: run { check(!success); return }
- val originalNext = this.originalNext ?: run { check(!success); return }
+ val originalNext = originalNext ?: run { check(!success); return }
val update = if (success) updatedNext(affectedNode, originalNext) else originalNext
- if (NEXT.compareAndSet(affectedNode, op, update)) {
+ if (affectedNode._next.compareAndSet(op, update)) {
if (success) finishOnSuccess(affectedNode, originalNext)
}
}
@@ -465,10 +463,9 @@
// ------ other helpers ------
private fun finishAdd(next: Node) {
- while (true) {
- val nextPrev = next._prev
+ next._prev.loop { nextPrev ->
if (nextPrev is Removed || this.next !== next) return // next was removed, remover fixes up links
- if (PREV.compareAndSet(next, nextPrev, this)) {
+ if (next._prev.compareAndSet(nextPrev, this)) {
if (this.next is Removed) {
// already removed
next.correctPrev(nextPrev as Node, null)
@@ -480,14 +477,14 @@
private fun finishRemove(next: Node) {
helpDelete()
- next.correctPrev(_prev.unwrap(), null)
+ next.correctPrev(_prev.value.unwrap(), null)
}
private fun markPrev(): Node {
- while (true) { // lock-free loop on prev
- val prev = this._prev
+ _prev.loop { prev ->
if (prev is Removed) return prev.ref
- if (PREV.compareAndSet(this, prev, (prev as Node).removed())) return prev
+ val removedPrev = (prev as Node).removed()
+ if (_prev.compareAndSet(prev, removedPrev)) return prev
}
}
@@ -496,7 +493,7 @@
internal fun helpDelete() {
var last: Node? = null // will set to the node left of prev when found
var prev: Node = markPrev()
- var next: Node = (this._next as Removed).ref
+ var next: Node = (this._next.value as Removed).ref
while (true) {
// move to the right until first non-removed node
val nextNext = next.next
@@ -510,11 +507,11 @@
if (prevNext is Removed) {
if (last != null) {
prev.markPrev()
- NEXT.compareAndSet(last, prev, prevNext.ref)
+ last._next.compareAndSet(prev, prevNext.ref)
prev = last
last = null
} else {
- prev = prev._prev.unwrap()
+ prev = prev._prev.value.unwrap()
}
continue
}
@@ -526,18 +523,20 @@
continue
}
// Now prev & next are Ok
- if (NEXT.compareAndSet(prev, this, next)) return // success!
+ if (prev._next.compareAndSet(this, next)) return // success!
}
}
// fixes prev links from this node
- private fun correctPrev(_prev: Node, op: OpDescriptor?) {
+ // returns affected node by this operation when this op is in progress (and nothing can be corrected)
+ // returns null otherwise (prev was corrected)
+ private fun correctPrev(_prev: Node, op: OpDescriptor?): Node? {
var prev: Node = _prev
var last: Node? = null // will be set so that last.next === prev
while (true) {
// move the the left until first non-removed node
- val prevNext = prev._next
- if (prevNext === op) return // part of the same op -- don't recurse
+ val prevNext = prev._next.value
+ if (prevNext === op) return prev // part of the same op -- don't recurse, didn't correct prev
if (prevNext is OpDescriptor) { // help & retry
prevNext.perform(prev)
continue
@@ -545,32 +544,32 @@
if (prevNext is Removed) {
if (last !== null) {
prev.markPrev()
- NEXT.compareAndSet(last, prev, prevNext.ref)
+ last._next.compareAndSet(prev, prevNext.ref)
prev = last
last = null
} else {
- prev = prev._prev.unwrap()
+ prev = prev._prev.value.unwrap()
}
continue
}
- val oldPrev = this._prev
- if (oldPrev is Removed) return // this node was removed, too -- its remover will take care
+ val oldPrev = this._prev.value
+ if (oldPrev is Removed) return null // this node was removed, too -- its remover will take care
if (prevNext !== this) {
// need to fixup next
last = prev
prev = prevNext as Node
continue
}
- if (oldPrev === prev) return // it is already linked as needed
- if (PREV.compareAndSet(this, oldPrev, prev)) {
- if (prev._prev !is Removed) return // finish only if prev was not concurrently removed
+ if (oldPrev === prev) return null // it is already linked as needed
+ if (this._prev.compareAndSet(oldPrev, prev)) {
+ if (prev._prev.value !is Removed) return null // finish only if prev was not concurrently removed
}
}
}
internal fun validateNode(prev: Node, next: Node) {
- check(prev === this._prev)
- check(next === this._next)
+ check(prev === this._prev.value)
+ check(next === this._next.value)
}
override fun toString(): String = "${this::class.java.simpleName}@${Integer.toHexString(System.identityHashCode(this))}"
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListAtomicStressLFTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListAtomicStressLFTest.kt
new file mode 100644
index 0000000..e455f7a
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListAtomicStressLFTest.kt
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.internal
+
+import kotlinx.atomicfu.LockFreedomTestEnvironment
+import kotlinx.coroutines.experimental.TestBase
+import org.junit.Assert.*
+import org.junit.Test
+import java.util.*
+import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.atomic.AtomicReference
+
+/**
+ * This stress test has 4 threads adding randomly to the list and them immediately undoing
+ * this addition by remove, and 4 threads trying to remove nodes from two lists simultaneously (atomically).
+ */
+class LockFreeLinkedListAtomicStressLFTest : TestBase() {
+ private val env = LockFreedomTestEnvironment("LockFreeLinkedListAtomicStressLFTest")
+
+ data class IntNode(val i: Int) : LockFreeLinkedListNode()
+
+ private val TEST_DURATION_SEC = 5 * stressTestMultiplier
+
+ val nLists = 4
+ val nAdderThreads = 4
+ val nRemoverThreads = 4
+
+ val lists = Array(nLists) { LockFreeLinkedListHead() }
+
+ val undone = AtomicLong()
+ val missed = AtomicLong()
+ val removed = AtomicLong()
+ val error = AtomicReference<Throwable>()
+
+ @Test
+ fun testStress() {
+ repeat(nAdderThreads) { threadId ->
+ val rnd = Random()
+ env.testThread(name = "adder-$threadId") {
+ when (rnd.nextInt(4)) {
+ 0 -> {
+ val list = lists[rnd.nextInt(nLists)]
+ val node = IntNode(threadId)
+ list.addLast(node)
+ burnTime(rnd)
+ tryRemove(node)
+ }
+ 1 -> {
+ // just to test conditional add
+ val list = lists[rnd.nextInt(nLists)]
+ val node = IntNode(threadId)
+ assertTrue(list.addLastIf(node, { true }))
+ burnTime(rnd)
+ tryRemove(node)
+ }
+ 2 -> {
+ // just to test failed conditional add and burn some time
+ val list = lists[rnd.nextInt(nLists)]
+ val node = IntNode(threadId)
+ assertFalse(list.addLastIf(node, { false }))
+ burnTime(rnd)
+ }
+ 3 -> {
+ // add two atomically
+ val idx1 = rnd.nextInt(nLists - 1)
+ val idx2 = idx1 + 1 + rnd.nextInt(nLists - idx1 - 1)
+ check(idx1 < idx2) // that is our global order
+ val list1 = lists[idx1]
+ val list2 = lists[idx2]
+ val node1 = IntNode(threadId)
+ val node2 = IntNode(-threadId - 1)
+ val add1 = list1.describeAddLast(node1)
+ val add2 = list2.describeAddLast(node2)
+ val op = object : AtomicOp<Any?>() {
+ override fun prepare(affected: Any?): Any? =
+ add1.prepare(this) ?:
+ add2.prepare(this)
+ override fun complete(affected: Any?, failure: Any?) {
+ add1.complete(this, failure)
+ add2.complete(this, failure)
+ }
+ }
+ assertTrue(op.perform(null) == null)
+ burnTime(rnd)
+ tryRemove(node1)
+ tryRemove(node2)
+ }
+ else -> error("Cannot happen")
+ }
+ }
+ }
+ repeat(nRemoverThreads) { threadId ->
+ val rnd = Random()
+ env.testThread(name = "remover-$threadId") {
+ val idx1 = rnd.nextInt(nLists - 1)
+ val idx2 = idx1 + 1 + rnd.nextInt(nLists - idx1 - 1)
+ check(idx1 < idx2) // that is our global order
+ val list1 = lists[idx1]
+ val list2 = lists[idx2]
+ val remove1 = list1.describeRemoveFirst()
+ val remove2 = list2.describeRemoveFirst()
+ val op = object : AtomicOp<Any?>() {
+ override fun prepare(affected: Any?): Any? =
+ remove1.prepare(this) ?:
+ remove2.prepare(this)
+ override fun complete(affected: Any?, failure: Any?) {
+ remove1.complete(this, failure)
+ remove2.complete(this, failure)
+ }
+ }
+ val success = op.perform(null) == null
+ if (success) removed.addAndGet(2)
+ }
+ }
+ env.performTest(TEST_DURATION_SEC) {
+ val _undone = undone.get()
+ val _missed = missed.get()
+ val _removed = removed.get()
+ println(" Adders undone $_undone node additions")
+ println(" Adders missed $_missed nodes")
+ println("Remover removed $_removed nodes")
+ }
+ error.get()?.let { throw it }
+ assertEquals(missed.get(), removed.get())
+ assertTrue(undone.get() > 0)
+ assertTrue(missed.get() > 0)
+ lists.forEach { it.validate() }
+ }
+
+ private val sink = IntArray(1024)
+
+ private fun burnTime(rnd: Random) {
+ if (rnd.nextInt(100) < 95) return // be quick, no wait 95% of time
+ do {
+ val x = rnd.nextInt(100)
+ val i = rnd.nextInt(sink.size)
+ repeat(x) { sink[i] += it }
+ } while (x >= 90)
+ }
+
+ private fun tryRemove(node: IntNode) {
+ if (node.remove())
+ undone.incrementAndGet()
+ else
+ missed.incrementAndGet()
+ }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListAtomicStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListAtomicStressTest.kt
deleted file mode 100644
index b477981..0000000
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListAtomicStressTest.kt
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Copyright 2016-2017 JetBrains s.r.o.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kotlinx.coroutines.experimental.internal
-
-import kotlinx.coroutines.experimental.TestBase
-import org.junit.Assert.*
-import org.junit.Test
-import java.util.*
-import java.util.concurrent.atomic.AtomicInteger
-import kotlin.concurrent.thread
-
-/**
- * This stress test has 4 threads adding randomly to the list and them immediately undoing
- * this addition by remove, and 4 threads trying to remove nodes from two lists simultaneously (atomically).
- */
-class LockFreeLinkedListAtomicStressTest : TestBase() {
- data class IntNode(val i: Int) : LockFreeLinkedListNode()
-
- val TEST_DURATION = 5000L * stressTestMultiplier
-
- val threads = mutableListOf<Thread>()
- val nLists = 4
- val nAdderThreads = 4
- val nRemoverThreads = 4
- val completedAdder = AtomicInteger()
- val completedRemover = AtomicInteger()
-
- val lists = Array(nLists) { LockFreeLinkedListHead() }
-
- val undone = AtomicInteger()
- val missed = AtomicInteger()
- val removed = AtomicInteger()
-
- @Test
- fun testStress() {
- val deadline = System.currentTimeMillis() + TEST_DURATION
- repeat(nAdderThreads) { threadId ->
- threads += thread(start = false, name = "adder-$threadId") {
- val rnd = Random()
- while (System.currentTimeMillis() < deadline) {
- when (rnd.nextInt(4)) {
- 0 -> {
- val list = lists[rnd.nextInt(nLists)]
- val node = IntNode(threadId)
- list.addLast(node)
- burnTime(rnd)
- tryRemove(node)
- }
- 1 -> {
- // just to test conditional add
- val list = lists[rnd.nextInt(nLists)]
- val node = IntNode(threadId)
- assertTrue(list.addLastIf(node, { true }))
- burnTime(rnd)
- tryRemove(node)
- }
- 2 -> {
- // just to test failed conditional add and burn some time
- val list = lists[rnd.nextInt(nLists)]
- val node = IntNode(threadId)
- assertFalse(list.addLastIf(node, { false }))
- burnTime(rnd)
- }
- 3 -> {
- // add two atomically
- val idx1 = rnd.nextInt(nLists - 1)
- val idx2 = idx1 + 1 + rnd.nextInt(nLists - idx1 - 1)
- check(idx1 < idx2) // that is our global order
- val list1 = lists[idx1]
- val list2 = lists[idx2]
- val node1 = IntNode(threadId)
- val node2 = IntNode(-threadId - 1)
- val add1 = list1.describeAddLast(node1)
- val add2 = list2.describeAddLast(node2)
- val op = object : AtomicOp<Any?>() {
- override fun prepare(affected: Any?): Any? = add1.prepare(this) ?: add2.prepare(this)
- override fun complete(affected: Any?, failure: Any?) {
- add1.complete(this, failure)
- add2.complete(this, failure)
- }
- }
- assertTrue(op.perform(null) == null)
- burnTime(rnd)
- tryRemove(node1)
- tryRemove(node2)
- }
- else -> error("Cannot happen")
- }
- }
- completedAdder.incrementAndGet()
- }
- }
- repeat(nRemoverThreads) { threadId ->
- threads += thread(start = false, name = "remover-$threadId") {
- val rnd = Random()
- while (System.currentTimeMillis() < deadline) {
- val idx1 = rnd.nextInt(nLists - 1)
- val idx2 = idx1 + 1 + rnd.nextInt(nLists - idx1 - 1)
- check(idx1 < idx2) // that is our global order
- val list1 = lists[idx1]
- val list2 = lists[idx2]
- val remove1 = list1.describeRemoveFirst()
- val remove2 = list2.describeRemoveFirst()
- val op = object : AtomicOp<Any?>() {
- override fun prepare(affected: Any?): Any? = remove1.prepare(this) ?: remove2.prepare(this)
- override fun complete(affected: Any?, failure: Any?) {
- remove1.complete(this, failure)
- remove2.complete(this, failure)
- }
- }
- val success = op.perform(null) == null
- if (success) removed.addAndGet(2)
-
- }
- completedRemover.incrementAndGet()
- }
- }
- threads.forEach(Thread::start)
- threads.forEach(Thread::join)
- println("Completed successfully ${completedAdder.get()} adder threads")
- println("Completed successfully ${completedRemover.get()} remover threads")
- println(" Adders undone ${undone.get()} node additions")
- println(" Adders missed ${missed.get()} nodes")
- println("Remover removed ${removed.get()} nodes")
- assertEquals(nAdderThreads, completedAdder.get())
- assertEquals(nRemoverThreads, completedRemover.get())
- assertEquals(missed.get(), removed.get())
- assertTrue(undone.get() > 0)
- assertTrue(missed.get() > 0)
- lists.forEach { it.validate() }
- }
-
- private fun burnTime(rnd: Random) {
- when (rnd.nextInt(3)) {
- 0 -> {} // nothing -- be quick
- 1 -> Thread.yield() // burn some time
- 2 -> Thread.sleep(1) // burn more time
- else -> error("Cannot happen")
- }
- }
-
- private fun tryRemove(node: IntNode) {
- if (node.remove())
- undone.incrementAndGet()
- else
- missed.incrementAndGet()
- }
-}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 3e1ca50..ea2f366 100644
--- a/pom.xml
+++ b/pom.xml
@@ -44,6 +44,7 @@
<kotlin.version>1.1.3-2</kotlin.version>
<dokka.version>0.9.15-eap-1</dokka.version>
<junit.version>4.12</junit.version>
+ <atomicfu.version>0.2</atomicfu.version>
<maven.compiler.source>1.6</maven.compiler.source>
<maven.compiler.target>1.6</maven.compiler.target>
<core.docs.url>https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/</core.docs.url>
@@ -70,6 +71,15 @@
<repositories>
<repository>
+ <id>central</id>
+ <url>http://jcenter.bintray.com</url>
+ </repository>
+ <repository>
+ <id>bintray-kotlin-kotlinx</id>
+ <name>bintray</name>
+ <url>http://kotlin.bintray.com/kotlinx</url>
+ </repository>
+ <repository>
<id>bintray-kotlin-eap</id>
<name>bintray-kotlin-eap</name>
<url>http://dl.bintray.com/kotlin/kotlin-eap</url>
@@ -78,6 +88,15 @@
<pluginRepositories>
<pluginRepository>
+ <id>central</id>
+ <url>http://jcenter.bintray.com</url>
+ </pluginRepository>
+ <pluginRepository>
+ <id>bintray-kotlin-kotlinx</id>
+ <name>bintray</name>
+ <url>http://kotlin.bintray.com/kotlinx</url>
+ </pluginRepository>
+ <pluginRepository>
<id>bintray-kotlin-eap</id>
<name>bintray-kotlin-eap</name>
<url>http://dl.bintray.com/kotlin/kotlin-eap</url>