LockFreeLinkedList refactored
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt
index 3a7b6e2..2ba8fa4 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt
@@ -5,6 +5,10 @@
private typealias Node = LockFreeLinkedListNode
+internal const val UNDECIDED = 0
+internal const val SUCCESS = 1
+internal const val FAILURE = 2
+
/**
* Doubly-linked concurrent list node with remove support.
* Based on paper
@@ -41,24 +45,21 @@
private fun removed(): Removed =
removedRef ?: Removed(this).also { REMOVED_REF.lazySet(this, it) }
- abstract class CondAdd {
- internal lateinit var newNode: Node
- internal lateinit var oldNext: Node
+ abstract class CondAdd(val newNode: Node) {
+ lateinit var oldNext: Node
@Volatile
private var consensus: Int = UNDECIDED // status of operation
+
abstract fun isCondition(): Boolean
private companion object {
@JvmStatic
val CONSENSUS: AtomicIntegerFieldUpdater<CondAdd> =
- AtomicIntegerFieldUpdater.newUpdater(CondAdd::class.java, "consensus")
-
- const val UNDECIDED = 0
- const val SUCCESS = 1
- const val FAILURE = 2
+ AtomicIntegerFieldUpdater.newUpdater(CondAdd::class.java, "consensus")
}
- fun completeAdd(node: Node): Boolean {
+ // returns either SUCCESS or FAILURE
+ fun completeAdd(node: Node): Int {
// make decision on status
var consensus: Int
while (true) {
@@ -75,10 +76,14 @@
// only the thread the makes this update actually finishes add operation
if (success) newNode.finishAdd(oldNext)
}
- return success
+ return consensus
}
}
+ private inline fun makeCondAdd(node: Node, crossinline condition: () -> Boolean): CondAdd = object : CondAdd(node) {
+ override fun isCondition(): Boolean = condition()
+ }
+
val isRemoved: Boolean get() = _next is Removed
private val isFresh: Boolean get() = _next === this && prev === this
@@ -101,36 +106,31 @@
// ------ addFirstXXX ------
- private fun addFirstCC(node: Node, condAdd: CondAdd?): Boolean {
- require(node.isFresh)
- condAdd?.newNode = node
+ /**
+ * Adds first item to this list.
+ */
+ fun addFirst(node: Node) {
while (true) { // lock-free loop on next
val next = this.next as Node // this sentinel node is never removed
- PREV.lazySet(node, this)
- NEXT.lazySet(node, next)
- condAdd?.oldNext = next
- if (NEXT.compareAndSet(this, next, condAdd ?: node)) {
- // added successfully (linearized add) -- fixup the list
- return condAdd?.completeAdd(this) ?: run { node.finishAdd(next); true }
- }
+ if (addNext(node, next)) return
}
}
/**
- * Adds first item to this list.
- */
- fun addFirst(node: Node) { addFirstCC(node, null) }
-
- /**
* Adds first item to this list atomically if the [condition] is true.
*/
- inline fun addFirstIf(node: Node, crossinline condition: () -> Boolean): Boolean =
- addFirstCC(node, object : CondAdd() {
- override fun isCondition(): Boolean = condition()
- })
+ inline fun addFirstIf(node: Node, crossinline condition: () -> Boolean): Boolean {
+ val condAdd = makeCondAdd(node, condition)
+ while (true) { // lock-free loop on next
+ val next = this.next as Node // this sentinel node is never removed
+ when (tryCondAddNext(node, next, condAdd)) {
+ SUCCESS -> return true
+ FAILURE -> return false
+ }
+ }
+ }
fun addFirstIfEmpty(node: Node): Boolean {
- require(node.isFresh)
PREV.lazySet(node, this)
NEXT.lazySet(node, this)
if (!NEXT.compareAndSet(this, this, node)) return false // this is not an empty list!
@@ -141,40 +141,51 @@
// ------ addLastXXX ------
- private fun addLastCC(node: Node, condAdd: CondAdd?): Boolean {
- require(node.isFresh)
- condAdd?.newNode = node
+ /**
+ * Adds last item to this list.
+ */
+ fun addLast(node: Node) {
while (true) { // lock-free loop on prev.next
val prev = prevHelper() ?: continue
- PREV.lazySet(node, prev)
- NEXT.lazySet(node, this)
- condAdd?.oldNext = this
- if (NEXT.compareAndSet(prev, this, condAdd ?: node)) {
- // added successfully (linearized add) -- fixup the list
- return condAdd?.completeAdd(prev) ?: run { node.finishAdd(this); true }
- }
+ if (prev.addNext(node, this)) return
}
}
/**
- * Adds last item to this list.
- */
- fun addLast(node: Node) { addLastCC(node, null) }
-
- /**
* Adds last item to this list atomically if the [condition] is true.
*/
- inline fun addLastIf(node: Node, crossinline condition: () -> Boolean): Boolean =
- addLastCC(node, object : CondAdd() {
- override fun isCondition(): Boolean = condition()
- })
+ inline fun addLastIf(node: Node, crossinline condition: () -> Boolean): Boolean {
+ val condAdd = makeCondAdd(node, condition)
+ while (true) { // lock-free loop on prev.next
+ val prev = prevHelper() ?: continue
+ when (prev.tryCondAddNext(node, this, condAdd)) {
+ SUCCESS -> return true
+ FAILURE -> return false
+ }
+ }
+ }
inline fun addLastIfPrev(node: Node, predicate: (Node) -> Boolean): Boolean {
- require(node.isFresh)
while (true) { // lock-free loop on prev.next
val prev = prevHelper() ?: continue
if (!predicate(prev)) return false
- if (addAfterPrev(node, prev)) return true
+ if (prev.addNext(node, this)) return true
+ }
+ }
+
+ inline fun addLastIfPrevAndIf(
+ node: Node,
+ predicate: (Node) -> Boolean, // prev node predicate
+ crossinline condition: () -> Boolean // atomically checked condition
+ ): Boolean {
+ val condAdd = makeCondAdd(node, condition)
+ while (true) { // lock-free loop on prev.next
+ val prev = prevHelper() ?: continue
+ if (!predicate(prev)) return false
+ when (prev.tryCondAddNext(node, this, condAdd)) {
+ SUCCESS -> return true
+ FAILURE -> return false
+ }
}
}
@@ -185,15 +196,25 @@
return null
}
- private fun addAfterPrev(node: Node, prev: Node): Boolean {
- PREV.lazySet(node, prev)
- NEXT.lazySet(node, this)
- if (NEXT.compareAndSet(prev, this, node)) {
- // added successfully (linearized add) -- fixup the list
- node.finishAdd(this)
- return true
- }
- return false
+ // ------ addXXX util ------
+
+ private fun addNext(node: Node, next: Node): Boolean {
+ PREV.lazySet(node, this)
+ NEXT.lazySet(node, next)
+ if (!NEXT.compareAndSet(this, next, node)) return false
+ // added successfully (linearized add) -- fixup the list
+ node.finishAdd(next)
+ return true
+ }
+
+ // returns UNDECIDED, SUCCESS or FAILURE
+ private fun tryCondAddNext(node: Node, next: Node, condAdd: CondAdd): Int {
+ PREV.lazySet(node, this)
+ NEXT.lazySet(node, next)
+ condAdd.oldNext = next
+ if (!NEXT.compareAndSet(this, next, condAdd)) return UNDECIDED
+ // added operation successfully (linearized) -- complete it & fixup the list
+ return condAdd.completeAdd(this)
}
// ------ removeXXX ------
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelAtomicityStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelAtomicityStressTest.kt
index 5998e08..d444a5d 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelAtomicityStressTest.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelAtomicityStressTest.kt
@@ -31,7 +31,7 @@
launchReceiver()
val rnd = Random()
while (System.currentTimeMillis() < deadline) {
- when (rnd.nextInt(4)) {
+ when (rnd.nextInt(3)) {
0 -> { // cancel & restart sender
stopSender()
launchSender()
@@ -41,7 +41,6 @@
launchReceiver()
}
2 -> yield() // just yield (burn a little time)
- 3 -> delay(1L) // delay for more a bit
}
}
stopSender()