Multi-part atomic remove operation support for LockFreeLinkedList
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
index 1422db5..e011e2b 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
@@ -142,7 +142,7 @@
while (eventLoop!!.processNextEvent()) { /* just spin */ }
}
// now return result
- val state = getState()
+ val state = this.state
(state as? CompletedExceptionally)?.let { throw it.exception }
return state as T
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
index 05dbe72..3877fc8 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
@@ -113,10 +113,10 @@
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendCoroutineOrReturn { cont ->
- val safe = CancellableContinuationImpl(cont, getParentJobOrAbort(cont))
- if (!holdCancellability) safe.initCancellability()
- block(safe)
- safe.getResult()
+ val cancellable = CancellableContinuationImpl(cont, getParentJobOrAbort(cont), active = true)
+ if (!holdCancellability) cancellable.initCancellability()
+ block(cancellable)
+ cancellable.getResult()
}
@@ -149,12 +149,11 @@
}
@PublishedApi
-internal class CancellableContinuationImpl<in T>(
- private val delegate: Continuation<T>,
- private val parentJob: Job?
-) : AbstractCoroutine<T>(delegate.context, active = true), CancellableContinuation<T> {
- // only updated from the thread that invoked suspendCancellableCoroutine
-
+internal open class CancellableContinuationImpl<in T>(
+ private val delegate: Continuation<T>,
+ private val parentJob: Job?,
+ active: Boolean
+) : AbstractCoroutine<T>(delegate.context, active), CancellableContinuation<T> {
@Volatile
private var decision = UNDECIDED
@@ -173,24 +172,24 @@
initParentJob(parentJob)
}
- fun getResult(): Any? {
+ @PublishedApi
+ internal fun getResult(): Any? {
val decision = this.decision // volatile read
when (decision) {
UNDECIDED -> if (DECISION.compareAndSet(this, UNDECIDED, SUSPENDED)) return COROUTINE_SUSPENDED
YIELD -> return COROUTINE_SUSPENDED
}
// otherwise, afterCompletion was already invoked, and the result is in the state
- val state = getState()
+ val state = this.state
if (state is CompletedExceptionally) throw state.exception
return state
}
- override val isCancelled: Boolean
- get() = getState() is Cancelled
+ override val isCancelled: Boolean get() = state is Cancelled
override fun tryResume(value: T): Any? {
while (true) { // lock-free loop on state
- val state = getState() // atomic read
+ val state = this.state // atomic read
when (state) {
is Incomplete -> if (tryUpdateState(state, value)) return state
else -> return null // cannot resume -- not active anymore
@@ -200,7 +199,7 @@
override fun tryResumeWithException(exception: Throwable): Any? {
while (true) { // lock-free loop on state
- val state = getState() // atomic read
+ val state = this.state // atomic read
when (state) {
is Incomplete -> if (tryUpdateState(state, CompletedExceptionally(exception))) return state
else -> return null // cannot resume -- not active anymore
@@ -209,7 +208,7 @@
}
override fun completeResume(token: Any) {
- completeUpdateState(token, getState())
+ completeUpdateState(token, state)
}
@Suppress("UNCHECKED_CAST")
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt
index e9623c0..0ef0ffa 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt
@@ -62,7 +62,7 @@
final override fun resume(value: T) {
while (true) { // lock-free loop on state
- val state = getState() // atomic read
+ val state = this.state // atomic read
when (state) {
is Incomplete -> if (updateState(state, value)) return
is Cancelled -> return // ignore resumes on cancelled continuation
@@ -73,7 +73,7 @@
final override fun resumeWithException(exception: Throwable) {
while (true) { // lock-free loop on state
- val state = getState() // atomic read
+ val state = this.state // atomic read
when (state) {
is Incomplete -> if (updateState(state, CompletedExceptionally(exception))) return
is Cancelled -> {
@@ -92,8 +92,8 @@
// for nicer debugging
override fun toString(): String {
- val state = getState()
+ val state = this.state
val result = if (state is Incomplete) "" else "[$state]"
- return "${this::class.java.simpleName}{${describeState(state)}}$result@${Integer.toHexString(System.identityHashCode(this))}"
+ return "${this::class.java.simpleName}{${stateToString(state)}}$result@${Integer.toHexString(System.identityHashCode(this))}"
}
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
index c3954d3..38356b6 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
@@ -121,14 +121,14 @@
context: CoroutineContext,
active: Boolean
) : AbstractCoroutine<T>(context, active), Deferred<T> {
- override val isCompletedExceptionally: Boolean get() = getState() is CompletedExceptionally
- override val isCancelled: Boolean get() = getState() is Cancelled
+ override val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally
+ override val isCancelled: Boolean get() = state is Cancelled
@Suppress("UNCHECKED_CAST")
suspend override fun await(): T {
// fast-path -- check state (avoid extra object creation)
while(true) { // lock-free loop on state
- val state = this.getState()
+ val state = this.state
if (state !is Incomplete) {
// already complete -- just return result
if (state is CompletedExceptionally) throw state.exception
@@ -143,7 +143,7 @@
@Suppress("UNCHECKED_CAST")
private suspend fun awaitSuspend(): T = suspendCancellableCoroutine { cont ->
cont.unregisterOnCompletion(invokeOnCompletion {
- val state = getState()
+ val state = this.state
check(state !is Incomplete)
if (state is CompletedExceptionally)
cont.resumeWithException(state.exception)
@@ -154,7 +154,7 @@
@Suppress("UNCHECKED_CAST")
override fun getCompleted(): T {
- val state = getState()
+ val state = this.state
check(state !is Incomplete) { "This deferred value has not completed yet" }
if (state is CompletedExceptionally) throw state.exception
return state as T
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
index 8e474da..ac587d2 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
@@ -16,10 +16,8 @@
package kotlinx.coroutines.experimental
-import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
-import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
+import kotlinx.coroutines.experimental.internal.*
import java.util.concurrent.Future
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
import kotlin.coroutines.experimental.AbstractCoroutineContextElement
import kotlin.coroutines.experimental.Continuation
@@ -260,10 +258,12 @@
This state machine and its transition matrix are optimized for the common case when job is created in active
state (EMPTY_A) and at most one completion listener is added to it during its life-time.
+
+ Note, that the actual `_state` variable can also be a reference to atomic operation descriptor `OpDescriptor`
*/
@Volatile
- private var state: Any? = if (active) EmptyActive else EmptyNew // shared objects while we have no listeners
+ private var _state: Any? = if (active) EmptyActive else EmptyNew // shared objects while we have no listeners
@Volatile
private var registration: Job.Registration? = null
@@ -271,9 +271,9 @@
protected companion object {
@JvmStatic
private val STATE: AtomicReferenceFieldUpdater<JobSupport, Any?> =
- AtomicReferenceFieldUpdater.newUpdater(JobSupport::class.java, Any::class.java, "state")
+ AtomicReferenceFieldUpdater.newUpdater(JobSupport::class.java, Any::class.java, "_state")
- fun describeState(state: Any?): String =
+ fun stateToString(state: Any?): String =
if (state is Incomplete)
if (state.isActive) "Active" else "New"
else "Completed"
@@ -299,10 +299,16 @@
/**
* Returns current state of this job.
*/
- protected fun getState(): Any? = state
+ protected val state: Any? get() {
+ while (true) { // lock-free helping loop
+ val state = _state
+ if (state !is OpDescriptor) return state
+ state.perform(this)
+ }
+ }
/**
- * Tries to update current [state][getState] of this job.
+ * Tries to update current [state] of this job.
*/
internal fun updateState(expect: Any, update: Any?): Boolean {
if (!tryUpdateState(expect, update)) return false
@@ -347,14 +353,20 @@
afterCompletion(update)
}
- final override val isActive: Boolean get() {
+ public final override val isActive: Boolean get() {
val state = this.state
return state is Incomplete && state.isActive
}
- final override val isCompleted: Boolean get() = state !is Incomplete
+ public final override val isCompleted: Boolean get() = state !is Incomplete
- final override fun start(): Boolean {
+ // this is for `select` operator. `isSelected` state means "not new" (== was started or already completed)
+ public val isSelected: Boolean get() {
+ val state = this.state
+ return state !is Incomplete || state.isActive
+ }
+
+ public final override fun start(): Boolean {
while (true) { // lock-free loop on state
when (startInternal(state)) {
0 -> return false
@@ -375,7 +387,7 @@
// LIST -- a list of completion handlers (either new or active)
state is NodeList -> {
if (state.isActive) return 0
- if (!NodeList.ACTIVE.compareAndSet(state, 0, 1)) return -1
+ if (!NodeList.ACTIVE.compareAndSet(state, null, NodeList.ACTIVE_STATE)) return -1
onStart()
return 1
}
@@ -384,13 +396,53 @@
}
}
+ internal fun describeStart(failureMarker: Any): AtomicDesc =
+ object : AtomicDesc() {
+ override fun prepare(op: AtomicOp): Any? {
+ while (true) { // lock-free loop on state
+ val state = this@JobSupport._state
+ when {
+ state === op -> return null // already in progress
+ state is OpDescriptor -> state.perform(this@JobSupport) // help
+ state === EmptyNew -> { // EMPTY_NEW state -- no completion handlers, new
+ if (STATE.compareAndSet(this@JobSupport, state, op)) return null // success
+ }
+ state is NodeList -> { // LIST -- a list of completion handlers (either new or active)
+ if (state.isActive) return failureMarker
+ if (NodeList.ACTIVE.compareAndSet(state, null, op)) return null // success
+ }
+ else -> return failureMarker // not a new state
+ }
+ }
+ }
+
+ override fun complete(op: AtomicOp, failure: Any?) {
+ val success = failure == null
+ val state = this@JobSupport._state
+ when {
+ state === op -> {
+ if (STATE.compareAndSet(this@JobSupport, op, if (success) EmptyActive else EmptyNew)) {
+ if (success) onStart()
+ }
+ }
+ state is NodeList -> { // LIST -- a list of completion handlers (either new or active)
+ if (state._active === op) {
+ if (NodeList.ACTIVE.compareAndSet(state, op, if (success) NodeList.ACTIVE_STATE else null)) {
+ if (success) onStart()
+ }
+ }
+ }
+ }
+ }
+ }
+
/**
* Override to provide the actual [start] action.
*/
protected open fun onStart() {}
final override fun getCompletionException(): Throwable {
- val state = getState()
+ val state = this.state
return when (state) {
is Incomplete -> throw IllegalStateException("Job has not completed yet")
is CompletedExceptionally -> state.exception
@@ -414,14 +466,14 @@
// EMPTY_NEW state -- no completion handlers, new
state === EmptyNew -> {
// try to promote it to list in new state
- STATE.compareAndSet(this, state, NodeList(active = 0))
+ STATE.compareAndSet(this, state, NodeList(active = false))
}
// SINGLE/SINGLE+ state -- one completion handler
state is JobNode<*> -> {
// try to promote it to list (SINGLE+ state)
- state.addFirstIfEmpty(NodeList(active = 1))
+ state.addOneIfEmpty(NodeList(active = true))
// it must be in SINGLE+ state or state has changed (node could have need removed from state)
- val list = state.next() // either NodeList or somebody else won the race, updated state
+ val list = state.next // either NodeList or somebody else won the race, updated state
// just attempt converting it to list if state is still the same, then continue lock-free loop
STATE.compareAndSet(this, state, list)
}
@@ -498,25 +550,36 @@
?: InvokeOnCompletion(this, handler)
// for nicer debugging
- override fun toString(): String = "${this::class.java.simpleName}{${describeState(state)}}@${Integer.toHexString(System.identityHashCode(this))}"
+ override fun toString(): String = "${this::class.java.simpleName}{${stateToString(state)}}@${Integer.toHexString(System.identityHashCode(this))}"
/**
- * Interface for incomplete [state][getState] of a job.
+ * Interface for incomplete [state] of a job.
*/
public interface Incomplete {
val isActive: Boolean
}
private class NodeList(
- @Volatile
- var active: Int
+ active: Boolean
) : LockFreeLinkedListHead(), Incomplete {
- override val isActive: Boolean get() = active != 0
+ @Volatile
+ var _active: Any? = if (active) ACTIVE_STATE else null
+
+ override val isActive: Boolean get() {
+ while (true) { // helper loop for atomic ops
+ val active = this._active
+ if (active !is OpDescriptor) return active != null
+ active.perform(this)
+ }
+ }
companion object {
@JvmStatic
- val ACTIVE: AtomicIntegerFieldUpdater<NodeList> =
- AtomicIntegerFieldUpdater.newUpdater(NodeList::class.java, "active")
+ val ACTIVE: AtomicReferenceFieldUpdater<NodeList, Any?> =
+ AtomicReferenceFieldUpdater.newUpdater(NodeList::class.java, Any::class.java, "_active")
+
+ @JvmStatic
+ val ACTIVE_STATE = Symbol("ACTIVE_STATE")
}
override fun toString(): String = buildString {
@@ -533,7 +596,7 @@
}
/**
- * Class for a [state][getState] of a job that had completed exceptionally, including cancellation.
+ * Class for a [state] of a job that had completed exceptionally, including cancellation.
*
* @param cause the exceptional completion cause. If `cause` is null, then a [CancellationException]
* if created on first get from [exception] property.
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
index b8a58e8..a335f0e 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
@@ -63,17 +63,17 @@
/**
* Returns non-null closed token if it is first in the queue.
*/
- protected val closedForReceive: Any? get() = queue.next() as? Closed<*>
+ protected val closedForReceive: Any? get() = queue.next as? Closed<*>
/**
* Returns non-null closed token if it is last in the queue.
*/
- protected val closedForSend: Any? get() = queue.prev() as? Closed<*>
+ protected val closedForSend: Any? get() = queue.prev as? Closed<*>
// ------ SendChannel ------
public final override val isClosedForSend: Boolean get() = closedForSend != null
- public final override val isFull: Boolean get() = queue.next() !is ReceiveOrClosed<*> && isBufferFull
+ public final override val isFull: Boolean get() = queue.next !is ReceiveOrClosed<*> && isBufferFull
public final override suspend fun send(element: E) {
// fast path -- try offer non-blocking
@@ -152,7 +152,7 @@
// ------ ReceiveChannel ------
public final override val isClosedForReceive: Boolean get() = closedForReceive != null && isBufferEmpty
- public final override val isEmpty: Boolean get() = queue.next() !is Send && isBufferEmpty
+ public final override val isEmpty: Boolean get() = queue.next !is Send && isBufferEmpty
@Suppress("UNCHECKED_CAST")
public final override suspend fun receive(): E {
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Atomic.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Atomic.kt
new file mode 100644
index 0000000..5f5ace0
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Atomic.kt
@@ -0,0 +1,71 @@
+package kotlinx.coroutines.experimental.internal
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
+
+/**
+ * The most abstract operation that can be in process. Other threads observing an instance of this
+ * class in the fields of their object shall invoke [perform] to help.
+ *
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+public abstract class OpDescriptor {
+ /**
+ * Returns `null` is operation was performed successfully or some other
+ * object that indicates the failure reason.
+ */
+ abstract fun perform(affected: Any?): Any?
+}
+
+/**
+ * Descriptor for multi-word atomic operation.
+ * Based on paper
+ * ["A Practical Multi-Word Compare-and-Swap Operation"](http://www.cl.cam.ac.uk/research/srg/netos/papers/2002-casn.pdf)
+ * by Timothy L. Harris, Keir Fraser and Ian A. Pratt.
+ *
+ * Note: parts of atomic operation must be globally ordered. Otherwise, this implementation will produce
+ * [StackOverflowError].
+ *
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+public abstract class AtomicOp : OpDescriptor() {
+ @Volatile
+ private var _consensus: Any? = UNDECIDED
+
+ companion object {
+ @JvmStatic
+ private val CONSENSUS: AtomicReferenceFieldUpdater<AtomicOp, Any?> =
+ AtomicReferenceFieldUpdater.newUpdater(AtomicOp::class.java, Any::class.java, "_consensus")
+
+ private val UNDECIDED: Any = Symbol("UNDECIDED")
+ }
+
+ val isDecided: Boolean get() = _consensus !== UNDECIDED
+
+ abstract fun prepare(): Any? // `null` if Ok, or failure reason
+ abstract fun complete(affected: Any?, failure: Any?) // failure != null if failed to prepare op
+
+ // returns `null` on success
+ final override fun perform(affected: Any?): Any? {
+ // make decision on status
+ var decision: Any?
+ while (true) {
+ decision = this._consensus
+ if (decision !== UNDECIDED) break
+ decision = prepare()
+ check(decision !== UNDECIDED)
+ if (CONSENSUS.compareAndSet(this, UNDECIDED, decision)) break
+ }
+ complete(affected, decision)
+ return decision
+ }
+}
+
+/**
+ * A part of multi-step atomic operation [AtomicOp].
+ *
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+public abstract class AtomicDesc {
+ abstract fun prepare(op: AtomicOp): Any? // returns `null` if prepared successfully
+ abstract fun complete(op: AtomicOp, failure: Any?) // decision == null if success
+}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt
index f6f14e4..10f95ee 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt
@@ -16,7 +16,6 @@
package kotlinx.coroutines.experimental.internal
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
private typealias Node = LockFreeLinkedListNode
@@ -30,24 +29,45 @@
@PublishedApi
internal const val FAILURE = 2
+@PublishedApi
+internal val CONDITION_FALSE: Any = Symbol("CONDITION_FALSE")
+
+@PublishedApi
+internal val ALREADY_REMOVED: Any = Symbol("ALREADY_REMOVED")
+
+@PublishedApi
+internal val LIST_EMPTY: Any = Symbol("LIST_EMPTY")
+
+private val REMOVE_PREPARED: Any = Symbol("REMOVE_PREPARED")
+
+/**
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+public typealias RemoveFirstDesc<T> = LockFreeLinkedListNode.RemoveFirstDesc<T>
+
/**
* Doubly-linked concurrent list node with remove support.
* Based on paper
* ["Lock-Free and Practical Doubly Linked List-Based Deques Using Single-Word Compare-and-Swap"](http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.140.4693&rep=rep1&type=pdf)
* by Sundell and Tsigas.
- * The instance of this class serves both as list head/tail sentinel and as the list item.
- * Sentinel node should be never removed.
+ *
+ * Important notes:
+ * * The instance of this class serves both as list head/tail sentinel and as the list item.
+ * Sentinel node should be never removed.
+ * * There are no operations to add items to left side of the list, only to the end (right side), because we cannot
+ * efficiently linearize them with atomic multi-step head-removal operations. In short,
+ * support for [describeRemoveFirst] operation precludes ability to add items at the beginning.
*
* @suppress **This is unstable API and it is subject to change.**
*/
@Suppress("LeakingThis")
public open class LockFreeLinkedListNode {
@Volatile
- private var _next: Any = this // DoubleLinkedNode | Removed | CondAdd
+ private var _next: Any = this // DoubleLinkedNode | Removed | OpDescriptor
@Volatile
- private var prev: Any = this // DoubleLinkedNode | Removed
+ private var _prev: Any = this // DoubleLinkedNode | Removed
@Volatile
- private var removedRef: Removed? = null // lazily cached removed ref to this
+ private var _removedRef: Removed? = null // lazily cached removed ref to this
private companion object {
@JvmStatic
@@ -55,114 +75,70 @@
AtomicReferenceFieldUpdater.newUpdater(Node::class.java, Any::class.java, "_next")
@JvmStatic
val PREV: AtomicReferenceFieldUpdater<Node, Any> =
- AtomicReferenceFieldUpdater.newUpdater(Node::class.java, Any::class.java, "prev")
+ AtomicReferenceFieldUpdater.newUpdater(Node::class.java, Any::class.java, "_prev")
@JvmStatic
val REMOVED_REF: AtomicReferenceFieldUpdater<Node, Removed?> =
- AtomicReferenceFieldUpdater.newUpdater(Node::class.java, Removed::class.java, "removedRef")
- }
-
- private class Removed(val ref: Node) {
- override fun toString(): String = "Removed[$ref]"
+ AtomicReferenceFieldUpdater.newUpdater(Node::class.java, Removed::class.java, "_removedRef")
}
private fun removed(): Removed =
- removedRef ?: Removed(this).also { REMOVED_REF.lazySet(this, it) }
+ _removedRef ?: Removed(this).also { REMOVED_REF.lazySet(this, it) }
@PublishedApi
- internal abstract class CondAdd(val newNode: Node) {
+ internal abstract class CondAddOp(val newNode: Node) : AtomicOp() {
lateinit var oldNext: Node
- @Volatile
- private var consensus: Int = UNDECIDED // status of operation
- abstract fun isCondition(): Boolean
-
- private companion object {
- @JvmStatic
- val CONSENSUS: AtomicIntegerFieldUpdater<CondAdd> =
- AtomicIntegerFieldUpdater.newUpdater(CondAdd::class.java, "consensus")
- }
-
- // returns either SUCCESS or FAILURE
- fun completeAdd(node: Node): Int {
- // make decision on status
- var consensus: Int
- while (true) {
- consensus = this.consensus
- if (consensus != UNDECIDED) break
- val proposal = if (isCondition()) SUCCESS else FAILURE
- if (CONSENSUS.compareAndSet(this, UNDECIDED, proposal)) {
- consensus = proposal
- break
- }
- }
- val success = consensus == SUCCESS
- if (NEXT.compareAndSet(node, this, if (success) newNode else oldNext)) {
+ override fun complete(affected: Any?, failure: Any?) {
+ affected as Node // type assertion
+ val success = failure == null
+ val update = if (success) newNode else oldNext
+ if (NEXT.compareAndSet(affected, this, update)) {
// only the thread the makes this update actually finishes add operation
if (success) newNode.finishAdd(oldNext)
}
- return consensus
}
}
@PublishedApi
- internal inline fun makeCondAdd(node: Node, crossinline condition: () -> Boolean): CondAdd = object : CondAdd(node) {
- override fun isCondition(): Boolean = condition()
- }
+ internal inline fun makeCondAddOp(node: Node, crossinline condition: () -> Boolean): CondAddOp =
+ object : CondAddOp(node) {
+ override fun prepare(): Any? = if (condition()) null else CONDITION_FALSE
+ }
- public val isRemoved: Boolean get() = _next is Removed
+ public val isRemoved: Boolean get() = next is Removed
- private val isFresh: Boolean get() = _next === this && prev === this
-
- @PublishedApi
- internal val next: Any get() {
- while (true) { // helper loop on _next
+ // LINEARIZABLE.
+ public val next: Any get() {
+ while (true) { // operation helper loop on _next
val next = this._next
- if (next !is CondAdd) return next
- next.completeAdd(this)
+ if (next !is OpDescriptor) return next
+ next.perform(this)
}
}
- public fun next(): Node = next.unwrap()
-
- public fun prev(): Node {
+ // LINEARIZABLE. Note: use it on sentinel (never removed) node only
+ public val prev: Node get() {
while (true) {
- prevHelper()?.let { return it.unwrap() }
+ val prev = this._prev as Node // this sentinel node is never removed
+ if (prev.next === this) return prev
+ helpInsert(prev)
}
}
- // ------ addFirstXXX ------
+ // ------ addOneIfEmpty ------
- /**
- * Adds first item to this list.
- */
- public fun addFirst(node: Node) {
- while (true) { // lock-free loop on next
- val next = this.next as Node // this sentinel node is never removed
- if (addNext(node, next)) return
- }
- }
-
- /**
- * Adds first item to this list atomically if the [condition] is true.
- */
- public inline fun addFirstIf(node: Node, crossinline condition: () -> Boolean): Boolean {
- val condAdd = makeCondAdd(node, condition)
- while (true) { // lock-free loop on next
- val next = this.next as Node // this sentinel node is never removed
- when (tryCondAddNext(node, next, condAdd)) {
- SUCCESS -> return true
- FAILURE -> return false
- }
- }
- }
-
- public fun addFirstIfEmpty(node: Node): Boolean {
+ public fun addOneIfEmpty(node: Node): Boolean {
PREV.lazySet(node, this)
NEXT.lazySet(node, this)
- if (!NEXT.compareAndSet(this, this, node)) return false // this is not an empty list!
- // added successfully (linearized add) -- fixup the list
- node.finishAdd(this)
- return true
+ while (true) {
+ val next = next
+ if (next !== this) return false // this is not an empty list!
+ if (NEXT.compareAndSet(this, this, node)) {
+ // added successfully (linearized add) -- fixup the list
+ node.finishAdd(this)
+ return true
+ }
+ }
}
// ------ addLastXXX ------
@@ -172,7 +148,7 @@
*/
public fun addLast(node: Node) {
while (true) { // lock-free loop on prev.next
- val prev = prevHelper() ?: continue
+ val prev = prev
if (prev.addNext(node, this)) return
}
}
@@ -181,9 +157,9 @@
* Adds last item to this list atomically if the [condition] is true.
*/
public inline fun addLastIf(node: Node, crossinline condition: () -> Boolean): Boolean {
- val condAdd = makeCondAdd(node, condition)
+ val condAdd = makeCondAddOp(node, condition)
while (true) { // lock-free loop on prev.next
- val prev = prevHelper() ?: continue
+ val prev = prev
when (prev.tryCondAddNext(node, this, condAdd)) {
SUCCESS -> return true
FAILURE -> return false
@@ -193,7 +169,7 @@
public inline fun addLastIfPrev(node: Node, predicate: (Node) -> Boolean): Boolean {
while (true) { // lock-free loop on prev.next
- val prev = prevHelper() ?: continue
+ val prev = prev
if (!predicate(prev)) return false
if (prev.addNext(node, this)) return true
}
@@ -204,9 +180,9 @@
predicate: (Node) -> Boolean, // prev node predicate
crossinline condition: () -> Boolean // atomically checked condition
): Boolean {
- val condAdd = makeCondAdd(node, condition)
+ val condAdd = makeCondAddOp(node, condition)
while (true) { // lock-free loop on prev.next
- val prev = prevHelper() ?: continue
+ val prev = prev
if (!predicate(prev)) return false
when (prev.tryCondAddNext(node, this, condAdd)) {
SUCCESS -> return true
@@ -215,14 +191,6 @@
}
}
- @PublishedApi
- internal fun prevHelper(): Node? {
- val prev = this.prev as Node // this sentinel node is never removed
- if (prev.next === this) return prev
- helpInsert(prev)
- return null
- }
-
// ------ addXXX util ------
@PublishedApi
@@ -237,13 +205,13 @@
// returns UNDECIDED, SUCCESS or FAILURE
@PublishedApi
- internal fun tryCondAddNext(node: Node, next: Node, condAdd: CondAdd): Int {
+ internal fun tryCondAddNext(node: Node, next: Node, condAdd: CondAddOp): Int {
PREV.lazySet(node, this)
NEXT.lazySet(node, next)
condAdd.oldNext = next
if (!NEXT.compareAndSet(this, next, condAdd)) return UNDECIDED
// added operation successfully (linearized) -- complete it & fixup the list
- return condAdd.completeAdd(this)
+ return if (condAdd.perform(this) == null) SUCCESS else FAILURE
}
// ------ removeXXX ------
@@ -255,40 +223,165 @@
while (true) { // lock-free loop on next
val next = this.next
if (next is Removed) return false // was already removed -- don't try to help (original thread will take care)
+ check(next !== this) // sanity check -- can be true for sentinel nodes only, but they are never removed
if (NEXT.compareAndSet(this, next, (next as Node).removed())) {
// was removed successfully (linearized remove) -- fixup the list
- helpDelete()
- next.helpInsert(prev.unwrap())
+ finishRemove(next)
return true
}
}
}
- public fun removeFirstOrNull(): Node? {
- while (true) { // try to linearize
- val first = next()
- if (first == this) return null
- if (first.remove()) return first
+ public open fun describeRemove() : AtomicDesc? {
+ if (isRemoved) return null // fast path if was already removed
+ return object : AbstractAtomicDesc() {
+ override val affectedNode: Node? get() = this@LockFreeLinkedListNode
+ override var originalNext: Node? = null
+ override fun failure(affected: Node, next: Any): Any? =
+ if (next is Removed) ALREADY_REMOVED else null
+ override fun onPrepare(affected: Node, next: Node): Boolean {
+ originalNext = next
+ return true
+ }
+ override fun updatedNext(next: Node) = next.removed()
+ override fun finishOnSuccess(affected: Node, next: Node) = finishRemove(next)
}
}
+ public fun removeFirstOrNull(): Node? {
+ while (true) { // try to linearize
+ val first = next as Node
+ if (first === this) return null
+ if (first.remove()) return first
+ first.helpDelete() // must help delete, or loose lock-freedom
+ }
+ }
+
+ public fun describeRemoveFirst(): RemoveFirstDesc<Node> = RemoveFirstDesc(this)
+
public inline fun <reified T> removeFirstIfIsInstanceOf(): T? {
while (true) { // try to linearize
- val first = next()
- if (first == this) return null
+ val first = next as Node
+ if (first === this) return null
if (first !is T) return null
if (first.remove()) return first
+ first.helpDelete() // must help delete, or loose lock-freedom
}
}
// just peek at item when predicate is true
public inline fun <reified T> removeFirstIfIsInstanceOfOrPeekIf(predicate: (T) -> Boolean): T? {
while (true) { // try to linearize
- val first = next()
- if (first == this) return null
+ val first = next as Node
+ if (first === this) return null
if (first !is T) return null
if (predicate(first)) return first // just peek when predicate is true
if (first.remove()) return first
+ first.helpDelete() // must help delete, or loose lock-freedom
+ }
+ }
+
+ // ------ multi-word atomic operations helpers ------
+
+ public open class RemoveFirstDesc<T>(val queue: Node) : AbstractAtomicDesc() {
+ @Suppress("UNCHECKED_CAST")
+ public val result: T get() = affectedNode!! as T
+
+ final override fun takeAffectedNode(): Node = queue.next as Node
+ final override var affectedNode: Node? = null
+ final override var originalNext: Node? = null
+
+ // check node predicates here, must signal failure if affect is not of type T
+ protected override fun failure(affected: Node, next: Any): Any? =
+ if (affected === queue) LIST_EMPTY else null
+ // validate the resulting node (return false if it should be deleted)
+ protected open fun validatePrepared(node: T): Boolean = true // false means remove node & retry
+
+ final override fun retry(affected: Node, next: Any): Boolean {
+ if (next !is Removed) return false
+ affected.helpDelete() // must help delete, or loose lock-freedom
+ return true
+ }
+ @Suppress("UNCHECKED_CAST")
+ final override fun onPrepare(affected: Node, next: Node): Boolean {
+ check(affected !is LockFreeLinkedListHead)
+ if (!validatePrepared(affected as T)) return false
+ affectedNode = affected
+ originalNext = next
+ return true
+ }
+ final override fun updatedNext(next: Node): Any = next.removed()
+ final override fun finishOnSuccess(affected: Node, next: Node) = affected.finishRemove(next)
+ }
+
+ public abstract class AbstractAtomicDesc : AtomicDesc() {
+ protected abstract val affectedNode: Node?
+ protected abstract val originalNext: Node?
+ protected open fun takeAffectedNode(): Node = affectedNode!!
+ protected open fun failure(affected: Node, next: Any): Any? = null // next: Node | Removed
+ protected open fun retry(affected: Node, next: Any): Boolean = false // next: Node | Removed
+ protected abstract fun onPrepare(affected: Node, next: Node): Boolean // false means: remove node & retry
+ protected abstract fun updatedNext(next: Node): Any
+ protected abstract fun finishOnSuccess(affected: Node, next: Node)
+
+ // This is Harris's RDCSS (Restricted Double-Compare Single Swap) operation
+ // It inserts "op" descriptor of when "op" status is still undecided (rolls back otherwise)
+ private class PrepareOp(
+ val next: Node,
+ val op: AtomicOp,
+ val desc: AbstractAtomicDesc
+ ) : OpDescriptor() {
+ override fun perform(affected: Any?): Any? {
+ affected as Node // type assertion
+ if (!desc.onPrepare(affected, next)) return REMOVE_PREPARED
+ check(desc.affectedNode === affected)
+ check(desc.originalNext === next)
+ val update: Any = if (op.isDecided) next else op // restore if decision was already reached
+ NEXT.compareAndSet(affected, this, update)
+ return null // ok
+ }
+ }
+
+ final override fun prepare(op: AtomicOp): Any? {
+ while (true) { // lock free loop on next
+ val affected = takeAffectedNode()
+ // read its original next pointer first
+ val next = affected._next
+ // then see if already reached consensus on overall operation
+ if (op.isDecided) return null // already decided -- go to next desc
+ if (next === op) return null // already in process of operation -- all is good
+ if (next is OpDescriptor) {
+ // some other operation is in process -- help it
+ next.perform(affected)
+ continue // and retry
+ }
+ // next: Node | Removed
+ val failure = failure(affected, next)
+ if (failure != null) return failure // signal failure
+ if (retry(affected, next)) continue // retry operation
+ val prepareOp = PrepareOp(next as Node, op, this)
+ if (NEXT.compareAndSet(affected, next, prepareOp)) {
+ // prepared -- complete preparations
+ val prepFail = prepareOp.perform(affected) ?: return null // prepared successfully
+ check(prepFail === REMOVE_PREPARED) // the only way for prepare to fail
+ if (NEXT.compareAndSet(affected, prepareOp, next.removed())) {
+ affected.helpDelete()
+ }
+ }
+ }
+ }
+
+ final override fun complete(op: AtomicOp, failure: Any?) {
+ val success = failure == null
+ val update = if (success) updatedNext(originalNext!!) else originalNext
+ val affectedNode = affectedNode
+ if (affectedNode == null) {
+ check(!success)
+ return
+ }
+ if (NEXT.compareAndSet(affectedNode, op, update)) {
+ if (success) finishOnSuccess(affectedNode, originalNext!!)
+ }
}
}
@@ -296,7 +389,7 @@
private fun finishAdd(next: Node) {
while (true) {
- val nextPrev = next.prev
+ val nextPrev = next._prev
if (nextPrev is Removed || this.next !== next) return // next was removed, remover fixes up links
if (PREV.compareAndSet(next, nextPrev, this)) {
if (this.next is Removed) {
@@ -308,16 +401,22 @@
}
}
+ private fun finishRemove(next: Node) {
+ helpDelete()
+ next.helpInsert(_prev.unwrap())
+ }
+
private fun markPrev(): Node {
while (true) { // lock-free loop on prev
- val prev = this.prev
+ val prev = this._prev
if (prev is Removed) return prev.ref
if (PREV.compareAndSet(this, prev, (prev as Node).removed())) return prev
}
}
// fixes next links to the left of this node
- private fun helpDelete() {
+ @PublishedApi
+ internal fun helpDelete() {
var last: Node? = null // will set to the node left of prev when found
var prev: Node = markPrev()
var next: Node = (this._next as Removed).ref
@@ -338,7 +437,7 @@
prev = last
last = null
} else {
- prev = prev.prev.unwrap()
+ prev = prev._prev.unwrap()
}
continue
}
@@ -368,11 +467,11 @@
prev = last
last = null
} else {
- prev = prev.prev.unwrap()
+ prev = prev._prev.unwrap()
}
continue
}
- val oldPrev = this.prev
+ val oldPrev = this._prev
if (oldPrev is Removed) return // this node was removed, too -- its remover will take care
if (prevNext !== this) {
// need to fixup next
@@ -382,50 +481,56 @@
}
if (oldPrev === prev) return // it is already linked as needed
if (PREV.compareAndSet(this, oldPrev, prev)) {
- if (prev.prev !is Removed) return // finish only if prev was not concurrently removed
+ if (prev._prev !is Removed) return // finish only if prev was not concurrently removed
}
}
}
- private fun Any.unwrap(): Node = if (this is Removed) ref else this as Node
-
internal fun validateNode(prev: Node, next: Node) {
- check(prev === this.prev)
- check(next === this.next)
+ check(prev === this._prev)
+ check(next === this._next)
}
}
+private class Removed(val ref: Node) {
+ override fun toString(): String = "Removed[$ref]"
+}
+
+@PublishedApi
+internal fun Any.unwrap(): Node = if (this is Removed) ref else this as Node
+
/**
* Head (sentinel) item of the linked list that is never removed.
*
* @suppress **This is unstable API and it is subject to change.**
*/
public open class LockFreeLinkedListHead : LockFreeLinkedListNode() {
- public val isEmpty: Boolean get() = next() == this
+ public val isEmpty: Boolean get() = next === this
/**
* Iterates over all elements in this list of a specified type.
*/
public inline fun <reified T : Node> forEach(block: (T) -> Unit) {
- var cur: Node = next()
+ var cur: Node = next as Node
while (cur != this) {
if (cur is T) block(cur)
- cur = cur.next()
+ cur = cur.next.unwrap()
}
}
// just a defensive programming -- makes sure that list head sentinel is never removed
public final override fun remove() = throw UnsupportedOperationException()
+ public final override fun describeRemove(): AtomicDesc? = throw UnsupportedOperationException()
internal fun validate() {
var prev: Node = this
- var cur: Node = next()
+ var cur: Node = next as Node
while (cur != this) {
- val next = cur.next()
+ val next = cur.next.unwrap()
cur.validateNode(prev, next)
prev = cur
cur = next
}
- validateNode(prev, next())
+ validateNode(prev, next as Node)
}
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Symbol.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Symbol.kt
new file mode 100644
index 0000000..d34f865
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Symbol.kt
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.internal
+
+internal class Symbol(val symbol: String) {
+ override fun toString(): String = symbol
+}
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListAtomicRemoveStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListAtomicRemoveStressTest.kt
new file mode 100644
index 0000000..456681d
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListAtomicRemoveStressTest.kt
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.internal
+
+import org.junit.Assert.*
+import org.junit.Test
+import java.util.*
+import java.util.concurrent.atomic.AtomicInteger
+import kotlin.concurrent.thread
+
+/**
+ * This stress test has 4 threads adding randomly first to the list and them immediately undoing
+ * this addition by remove, and 4 threads trying to remove nodes from two lists simultaneously (atomically).
+ */
+class LockFreeLinkedListAtomicRemoveStressTest {
+ data class IntNode(val i: Int) : LockFreeLinkedListNode()
+
+ val threads = mutableListOf<Thread>()
+ val nLists = 4
+ val nRemoverThreads = 4
+ val timeout = 5000L
+ val completedAdder = AtomicInteger()
+ val completedRemover = AtomicInteger()
+
+ val lists = Array(nLists) { LockFreeLinkedListHead() }
+
+ val undone = AtomicInteger()
+ val missed = AtomicInteger()
+ val removed = AtomicInteger()
+
+ @Test
+ fun testStress() {
+ val deadline = System.currentTimeMillis() + timeout
+ repeat(nLists) { threadId ->
+ threads += thread(start = false, name = "adder-$threadId") {
+ val rnd = Random()
+ val list = lists[threadId]
+ while (System.currentTimeMillis() < deadline) {
+ var node: IntNode? = IntNode(threadId)
+ when (rnd.nextInt(3)) {
+ 0 -> list.addLast(node!!)
+ 1 -> assertTrue(list.addLastIf(node!!, { true })) // just to test conditional add
+ 2 -> { // just to test failed conditional add and burn some time
+ assertFalse(list.addLastIf(node!!, { false }))
+ node = null
+ }
+ else -> error("Cannot happen")
+ }
+ if (node == null) continue
+ when (rnd.nextInt(3)) {
+ 0 -> {} // nothing -- be quick
+ 1 -> {
+ // burn some time
+ Thread.yield()
+ }
+ 2 -> {
+ // burn more time
+ Thread.sleep(1)
+ }
+ else -> error("Cannot happen")
+ }
+ // undo add
+ if (node.remove())
+ undone.incrementAndGet()
+ else
+ missed.incrementAndGet()
+ }
+ completedAdder.incrementAndGet()
+ }
+ }
+ repeat(nRemoverThreads) { threadId ->
+ threads += thread(start = false, name = "remover-$threadId") {
+ val rnd = Random()
+ while (System.currentTimeMillis() < deadline) {
+ val idx1 = rnd.nextInt(nLists - 1)
+ val idx2 = idx1 + 1 + rnd.nextInt(nLists - idx1 - 1)
+ check(idx1 < idx2) // that is our global order
+ val list1 = lists[idx1]
+ val list2 = lists[idx2]
+ val remove1 = list1.describeRemoveFirst()
+ val remove2 = list2.describeRemoveFirst()
+ val op = object : AtomicOp() {
+ override fun prepare(): Any? = remove1.prepare(this) ?: remove2.prepare(this)
+ override fun complete(affected: Any?, failure: Any?) {
+ remove1.complete(this, failure)
+ remove2.complete(this, failure)
+ }
+ }
+ val success = op.perform(null) == null
+ if (success) removed.addAndGet(2)
+
+ }
+ completedRemover.incrementAndGet()
+ }
+ }
+ threads.forEach { it.start() }
+ threads.forEach { it.join() }
+ println("Completed successfully ${completedAdder.get()} adder threads")
+ println("Completed successfully ${completedRemover.get()} remover threads")
+ println(" Adders undone ${undone.get()} node additions")
+ println(" Adders missed ${missed.get()} nodes")
+ println("Remover removed ${removed.get()} nodes")
+ assertEquals(nLists, completedAdder.get())
+ assertEquals(nRemoverThreads, completedRemover.get())
+ assertEquals(missed.get(), removed.get())
+ assertTrue(undone.get() > 0)
+ assertTrue(missed.get() > 0)
+ lists.forEach { it.validate() }
+ }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListLongStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListLongStressTest.kt
index f5baa81..5daf1c5 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListLongStressTest.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListLongStressTest.kt
@@ -28,31 +28,23 @@
* stressed is long.
*/
class LockFreeLinkedListLongStressTest {
- private data class IntNode(val i: Int) : LockFreeLinkedListNode()
- private val list = LockFreeLinkedListHead()
+ data class IntNode(val i: Int) : LockFreeLinkedListNode()
+ val list = LockFreeLinkedListHead()
val threads = mutableListOf<Thread>()
val nAdded = 10_000_000
- val nAddThreads = 2
+ val nAddThreads = 4 // must be power of 2 (!!!)
val nRemoveThreads = 6
val removeProbability = 0.2
- val workingAdders = AtomicInteger(nAddThreads * 2)
+ val workingAdders = AtomicInteger(nAddThreads)
fun shallRemove(i: Int) = i and 63 != 42
@Test
fun testStress() {
for (j in 0 until nAddThreads)
- threads += thread(start = false, name = "firstAdder-$j") {
+ threads += thread(start = false, name = "adder-$j") {
for (i in j until nAdded step nAddThreads) {
- list.addFirst(IntNode(i))
- }
- println("${Thread.currentThread().name} completed")
- workingAdders.decrementAndGet()
- }
- for (j in 0 until nAddThreads)
- threads += thread(start = false, name = "lastAdder-$j") {
- for (i in -j-1 downTo -nAdded step nAddThreads) {
list.addLast(IntNode(i))
}
println("${Thread.currentThread().name} completed")
@@ -80,7 +72,7 @@
println("Verify result")
list.validate()
val expected = buildIterator {
- for (i in nAdded - 1 downTo -nAdded)
+ for (i in 0 until nAdded)
if (!shallRemove(i))
yield(i)
}
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListShortStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListShortStressTest.kt
index 9308526..a142117 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListShortStressTest.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListShortStressTest.kt
@@ -16,21 +16,20 @@
package kotlinx.coroutines.experimental.internal
-import org.junit.Assert.assertEquals
-import org.junit.Assert.assertTrue
+import org.junit.Assert.*
import org.junit.Test
import java.util.*
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
/**
- * This stress test has 6 threads adding randomly first or last item to the list and them immediately undoing
+ * This stress test has 6 threads adding randomly first to the list and them immediately undoing
* this addition by remove, and 4 threads removing first node. The resulting list that is being
* stressed is very short.
*/
class LockFreeLinkedListShortStressTest {
- private data class IntNode(val i: Int) : LockFreeLinkedListNode()
- private val list = LockFreeLinkedListHead()
+ data class IntNode(val i: Int) : LockFreeLinkedListNode()
+ val list = LockFreeLinkedListHead()
val threads = mutableListOf<Thread>()
val nAdderThreads = 6
@@ -50,17 +49,21 @@
threads += thread(start = false, name = "adder-$threadId") {
val rnd = Random()
while (System.currentTimeMillis() < deadline) {
- val node = IntNode(threadId)
- when (rnd.nextInt(4)) {
- 0 -> list.addFirst(node)
- 1 -> list.addLast(node)
- 2 -> list.addFirstIf(node, { true }) // just to test conditional add
- 3 -> list.addLastIf(node, { true })
+ var node: IntNode? = IntNode(threadId)
+ when (rnd.nextInt(3)) {
+ 0 -> list.addLast(node!!)
+ 1 -> assertTrue(list.addLastIf(node!!, { true })) // just to test conditional add
+ 2 -> { // just to test failed conditional add
+ assertFalse(list.addLastIf(node!!, { false }))
+ node = null
+ }
}
- if (node.remove())
- undone.incrementAndGet()
- else
- missed.incrementAndGet()
+ if (node != null) {
+ if (node.remove())
+ undone.incrementAndGet()
+ else
+ missed.incrementAndGet()
+ }
}
completedAdder.incrementAndGet()
}
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListTest.kt
index eba745b..2688256 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListTest.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListTest.kt
@@ -20,30 +20,7 @@
import org.junit.Test
class LockFreeLinkedListTest {
- private data class IntNode(val i: Int) : LockFreeLinkedListNode()
-
- @Test
- fun testSimpleAddFirst() {
- val list = LockFreeLinkedListHead()
- assertContents(list)
- val n1 = IntNode(1).apply { list.addFirst(this) }
- assertContents(list, 1)
- val n2 = IntNode(2).apply { list.addFirst(this) }
- assertContents(list, 2, 1)
- val n3 = IntNode(3).apply { list.addFirst(this) }
- assertContents(list, 3, 2, 1)
- val n4 = IntNode(4).apply { list.addFirst(this) }
- assertContents(list, 4, 3, 2, 1)
- assertTrue(n1.remove())
- assertContents(list, 4, 3, 2)
- assertTrue(n3.remove())
- assertFalse(n3.remove())
- assertContents(list, 4, 2)
- assertTrue(n4.remove())
- assertContents(list, 2)
- assertTrue(n2.remove())
- assertContents(list)
- }
+ data class IntNode(val i: Int) : LockFreeLinkedListNode()
@Test
fun testSimpleAddLast() {
@@ -76,10 +53,61 @@
assertContents(list, 1)
assertFalse(list.addLastIf(IntNode(2)) { false })
assertContents(list, 1)
- assertTrue(list.addFirstIf(IntNode(3)) { true })
- assertContents(list, 3, 1)
- assertFalse(list.addFirstIf(IntNode(4)) { false })
- assertContents(list, 3, 1)
+ assertTrue(list.addLastIf(IntNode(3)) { true })
+ assertContents(list, 1, 3)
+ assertFalse(list.addLastIf(IntNode(4)) { false })
+ assertContents(list, 1, 3)
+ }
+
+ @Test
+ fun testRemoveTwoAtomic() {
+ val list = LockFreeLinkedListHead()
+ val n1 = IntNode(1).apply { list.addLast(this) }
+ val n2 = IntNode(2).apply { list.addLast(this) }
+ assertContents(list, 1, 2)
+ assertFalse(n1.isRemoved)
+ assertFalse(n2.isRemoved)
+ val remove1Desc = n1.describeRemove()!!
+ val remove2Desc = n2.describeRemove()!!
+ val operation = object : AtomicOp() {
+ override fun prepare(): Any? = remove1Desc.prepare(this) ?: remove2Desc.prepare(this)
+ override fun complete(affected: Any?, failure: Any?) {
+ remove1Desc.complete(this, failure)
+ remove2Desc.complete(this, failure)
+ }
+ }
+ assertTrue(operation.perform(null) == null)
+ assertTrue(n1.isRemoved)
+ assertTrue(n2.isRemoved)
+ assertContents(list)
+ }
+
+ @Test
+ fun testAtomicOpsSingle() {
+ val list = LockFreeLinkedListHead()
+ assertContents(list)
+ val n1 = IntNode(1).also { list.addLast(it) }
+ assertContents(list, 1)
+ val n2 = IntNode(2).also { list.addLast(it) }
+ assertContents(list, 1, 2)
+ val n3 = IntNode(3).also { list.addLast(it) }
+ assertContents(list, 1, 2, 3)
+ val n4 = IntNode(4).also { list.addLast(it) }
+ assertContents(list, 1, 2, 3, 4)
+ single(n3.describeRemove()!!)
+ assertContents(list, 1, 2, 4)
+ assertTrue(n3.describeRemove() == null)
+ single(list.describeRemoveFirst()!!)
+ assertContents(list, 2, 4)
+ assertTrue(n1.describeRemove() == null)
+ }
+
+ private fun single(part: AtomicDesc) {
+ val operation = object : AtomicOp() {
+ override fun prepare(): Any? = part.prepare(this)
+ override fun complete(affected: Any?, failure: Any?) = part.complete(this, failure)
+ }
+ assertTrue(operation.perform(null) == null)
}
private fun assertContents(list: LockFreeLinkedListHead, vararg expected: Int) {