Multi-part atomic remove operation support for LockFreeLinkedList
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
index 1422db5..e011e2b 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
@@ -142,7 +142,7 @@
             while (eventLoop!!.processNextEvent()) { /* just spin */ }
         }
         // now return result
-        val state = getState()
+        val state = this.state
         (state as? CompletedExceptionally)?.let { throw it.exception }
         return state as T
     }
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
index 05dbe72..3877fc8 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
@@ -113,10 +113,10 @@
     crossinline block: (CancellableContinuation<T>) -> Unit
 ): T =
     suspendCoroutineOrReturn { cont ->
-        val safe = CancellableContinuationImpl(cont, getParentJobOrAbort(cont))
-        if (!holdCancellability) safe.initCancellability()
-        block(safe)
-        safe.getResult()
+        val cancellable = CancellableContinuationImpl(cont, getParentJobOrAbort(cont), active = true)
+        if (!holdCancellability) cancellable.initCancellability()
+        block(cancellable)
+        cancellable.getResult()
     }
 
 
@@ -149,12 +149,11 @@
 }
 
 @PublishedApi
-internal class CancellableContinuationImpl<in T>(
-        private val delegate: Continuation<T>,
-        private val parentJob: Job?
-) : AbstractCoroutine<T>(delegate.context, active = true), CancellableContinuation<T> {
-    // only updated from the thread that invoked suspendCancellableCoroutine
-
+internal open class CancellableContinuationImpl<in T>(
+    private val delegate: Continuation<T>,
+    private val parentJob: Job?,
+    active: Boolean
+) : AbstractCoroutine<T>(delegate.context, active), CancellableContinuation<T> {
     @Volatile
     private var decision = UNDECIDED
 
@@ -173,24 +172,24 @@
         initParentJob(parentJob)
     }
 
-    fun getResult(): Any? {
+    @PublishedApi
+    internal fun getResult(): Any? {
         val decision = this.decision // volatile read
         when (decision) {
             UNDECIDED -> if (DECISION.compareAndSet(this, UNDECIDED, SUSPENDED)) return COROUTINE_SUSPENDED
             YIELD -> return COROUTINE_SUSPENDED
         }
         // otherwise, afterCompletion was already invoked, and the result is in the state
-        val state = getState()
+        val state = this.state
         if (state is CompletedExceptionally) throw state.exception
         return state
     }
 
-    override val isCancelled: Boolean
-        get() = getState() is Cancelled
+    override val isCancelled: Boolean get() = state is Cancelled
 
     override fun tryResume(value: T): Any? {
         while (true) { // lock-free loop on state
-            val state = getState() // atomic read
+            val state = this.state // atomic read
             when (state) {
                 is Incomplete -> if (tryUpdateState(state, value)) return state
                 else -> return null // cannot resume -- not active anymore
@@ -200,7 +199,7 @@
 
     override fun tryResumeWithException(exception: Throwable): Any? {
         while (true) { // lock-free loop on state
-            val state = getState() // atomic read
+            val state = this.state // atomic read
             when (state) {
                 is Incomplete -> if (tryUpdateState(state, CompletedExceptionally(exception))) return state
                 else -> return null // cannot resume -- not active anymore
@@ -209,7 +208,7 @@
     }
 
     override fun completeResume(token: Any) {
-        completeUpdateState(token, getState())
+        completeUpdateState(token, state)
     }
 
     @Suppress("UNCHECKED_CAST")
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt
index e9623c0..0ef0ffa 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt
@@ -62,7 +62,7 @@
 
     final override fun resume(value: T) {
         while (true) { // lock-free loop on state
-            val state = getState() // atomic read
+            val state = this.state // atomic read
             when (state) {
                 is Incomplete -> if (updateState(state, value)) return
                 is Cancelled -> return // ignore resumes on cancelled continuation
@@ -73,7 +73,7 @@
 
     final override fun resumeWithException(exception: Throwable) {
         while (true) { // lock-free loop on state
-            val state = getState() // atomic read
+            val state = this.state // atomic read
             when (state) {
                 is Incomplete -> if (updateState(state, CompletedExceptionally(exception))) return
                 is Cancelled -> {
@@ -92,8 +92,8 @@
 
     // for nicer debugging
     override fun toString(): String {
-        val state = getState()
+        val state = this.state
         val result = if (state is Incomplete) "" else "[$state]"
-        return "${this::class.java.simpleName}{${describeState(state)}}$result@${Integer.toHexString(System.identityHashCode(this))}"
+        return "${this::class.java.simpleName}{${stateToString(state)}}$result@${Integer.toHexString(System.identityHashCode(this))}"
     }
 }
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
index c3954d3..38356b6 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
@@ -121,14 +121,14 @@
     context: CoroutineContext,
     active: Boolean
 ) : AbstractCoroutine<T>(context, active), Deferred<T> {
-    override val isCompletedExceptionally: Boolean get() = getState() is CompletedExceptionally
-    override val isCancelled: Boolean get() = getState() is Cancelled
+    override val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally
+    override val isCancelled: Boolean get() = state is Cancelled
 
     @Suppress("UNCHECKED_CAST")
     suspend override fun await(): T {
         // fast-path -- check state (avoid extra object creation)
         while(true) { // lock-free loop on state
-            val state = this.getState()
+            val state = this.state
             if (state !is Incomplete) {
                 // already complete -- just return result
                 if (state is CompletedExceptionally) throw state.exception
@@ -143,7 +143,7 @@
     @Suppress("UNCHECKED_CAST")
     private suspend fun awaitSuspend(): T = suspendCancellableCoroutine { cont ->
         cont.unregisterOnCompletion(invokeOnCompletion {
-            val state = getState()
+            val state = this.state
             check(state !is Incomplete)
             if (state is CompletedExceptionally)
                 cont.resumeWithException(state.exception)
@@ -154,7 +154,7 @@
 
     @Suppress("UNCHECKED_CAST")
     override fun getCompleted(): T {
-        val state = getState()
+        val state = this.state
         check(state !is Incomplete) { "This deferred value has not completed yet" }
         if (state is CompletedExceptionally) throw state.exception
         return state as T
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
index 8e474da..ac587d2 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
@@ -16,10 +16,8 @@
 
 package kotlinx.coroutines.experimental
 
-import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
-import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
+import kotlinx.coroutines.experimental.internal.*
 import java.util.concurrent.Future
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
 import kotlin.coroutines.experimental.AbstractCoroutineContextElement
 import kotlin.coroutines.experimental.Continuation
@@ -260,10 +258,12 @@
 
        This state machine and its transition matrix are optimized for the common case when job is created in active
        state (EMPTY_A) and at most one completion listener is added to it during its life-time.
+
+       Note, that the actual `_state` variable can also be a reference to atomic operation descriptor `OpDescriptor`
      */
 
     @Volatile
-    private var state: Any? = if (active) EmptyActive else EmptyNew // shared objects while we have no listeners
+    private var _state: Any? = if (active) EmptyActive else EmptyNew // shared objects while we have no listeners
 
     @Volatile
     private var registration: Job.Registration? = null
@@ -271,9 +271,9 @@
     protected companion object {
         @JvmStatic
         private val STATE: AtomicReferenceFieldUpdater<JobSupport, Any?> =
-            AtomicReferenceFieldUpdater.newUpdater(JobSupport::class.java, Any::class.java, "state")
+            AtomicReferenceFieldUpdater.newUpdater(JobSupport::class.java, Any::class.java, "_state")
 
-        fun describeState(state: Any?): String =
+        fun stateToString(state: Any?): String =
                 if (state is Incomplete)
                     if (state.isActive) "Active" else "New"
                 else "Completed"
@@ -299,10 +299,16 @@
     /**
      * Returns current state of this job.
      */
-    protected fun getState(): Any? = state
+    protected val state: Any? get() {
+        while (true) { // lock-free helping loop
+            val state = _state
+            if (state !is OpDescriptor) return state
+            state.perform(this)
+        }
+    }
 
     /**
-     * Tries to update current [state][getState] of this job.
+     * Tries to update current [state] of this job.
      */
     internal fun updateState(expect: Any, update: Any?): Boolean {
         if (!tryUpdateState(expect, update)) return false
@@ -347,14 +353,20 @@
         afterCompletion(update)
     }
 
-    final override val isActive: Boolean get() {
+    public final override val isActive: Boolean get() {
         val state = this.state
         return state is Incomplete && state.isActive
     }
 
-    final override val isCompleted: Boolean get() = state !is Incomplete
+    public final override val isCompleted: Boolean get() = state !is Incomplete
 
-    final override fun start(): Boolean {
+    // this is for `select` operator. `isSelected` state means "not new" (== was started or already completed)
+    public val isSelected: Boolean get() {
+        val state = this.state
+        return state !is Incomplete || state.isActive
+    }
+
+    public final override fun start(): Boolean {
         while (true) { // lock-free loop on state
             when (startInternal(state)) {
                 0 -> return false
@@ -375,7 +387,7 @@
             // LIST -- a list of completion handlers (either new or active)
             state is NodeList -> {
                 if (state.isActive) return 0
-                if (!NodeList.ACTIVE.compareAndSet(state, 0, 1)) return -1
+                if (!NodeList.ACTIVE.compareAndSet(state, null, NodeList.ACTIVE_STATE)) return -1
                 onStart()
                 return 1
             }
@@ -384,13 +396,53 @@
         }
     }
 
+    internal fun describeStart(failureMarker: Any): AtomicDesc =
+        object : AtomicDesc() {
+            override fun prepare(op: AtomicOp): Any? {
+                while (true) { // lock-free loop on state
+                    val state = this@JobSupport._state
+                    when {
+                        state === op -> return null // already in progress
+                        state is OpDescriptor -> state.perform(this@JobSupport) // help
+                        state === EmptyNew -> { // EMPTY_NEW state -- no completion handlers, new
+                            if (STATE.compareAndSet(this@JobSupport, state, op)) return null // success
+                        }
+                        state is NodeList -> { // LIST -- a list of completion handlers (either new or active)
+                            if (state.isActive) return failureMarker
+                            if (NodeList.ACTIVE.compareAndSet(state, null, op)) return null // success
+                        }
+                        else -> return failureMarker // not a new state
+                    }
+                }
+            }
+
+            override fun complete(op: AtomicOp, failure: Any?) {
+                val success = failure == null
+                val state = this@JobSupport._state
+                when {
+                    state === op -> {
+                        if (STATE.compareAndSet(this@JobSupport, op, if (success) EmptyActive else EmptyNew)) {
+                            if (success) onStart()
+                        }
+                    }
+                    state is NodeList -> { // LIST -- a list of completion handlers (either new or active)
+                        if (state._active === op) {
+                            if (NodeList.ACTIVE.compareAndSet(state, op, if (success) NodeList.ACTIVE_STATE else null)) {
+                                if (success) onStart()
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
     /**
      * Override to provide the actual [start] action.
      */
     protected open fun onStart() {}
 
     final override fun getCompletionException(): Throwable {
-        val state = getState()
+        val state = this.state
         return when (state) {
             is Incomplete -> throw IllegalStateException("Job has not completed yet")
             is CompletedExceptionally -> state.exception
@@ -414,14 +466,14 @@
                 // EMPTY_NEW state -- no completion handlers, new
                 state === EmptyNew -> {
                     // try to promote it to list in new state
-                    STATE.compareAndSet(this, state, NodeList(active = 0))
+                    STATE.compareAndSet(this, state, NodeList(active = false))
                 }
                 // SINGLE/SINGLE+ state -- one completion handler
                 state is JobNode<*> -> {
                     // try to promote it to list (SINGLE+ state)
-                    state.addFirstIfEmpty(NodeList(active = 1))
+                    state.addOneIfEmpty(NodeList(active = true))
                     // it must be in SINGLE+ state or state has changed (node could have need removed from state)
-                    val list = state.next() // either NodeList or somebody else won the race, updated state
+                    val list = state.next // either NodeList or somebody else won the race, updated state
                     // just attempt converting it to list if state is still the same, then continue lock-free loop
                     STATE.compareAndSet(this, state, list)
                 }
@@ -498,25 +550,36 @@
                     ?: InvokeOnCompletion(this, handler)
 
     // for nicer debugging
-    override fun toString(): String = "${this::class.java.simpleName}{${describeState(state)}}@${Integer.toHexString(System.identityHashCode(this))}"
+    override fun toString(): String = "${this::class.java.simpleName}{${stateToString(state)}}@${Integer.toHexString(System.identityHashCode(this))}"
 
     /**
-     * Interface for incomplete [state][getState] of a job.
+     * Interface for incomplete [state] of a job.
      */
     public interface Incomplete {
         val isActive: Boolean
     }
 
     private class NodeList(
-        @Volatile
-        var active: Int
+        active: Boolean
     ) : LockFreeLinkedListHead(), Incomplete {
-        override val isActive: Boolean get() = active != 0
+        @Volatile
+        var _active: Any? = if (active) ACTIVE_STATE else null
+
+        override val isActive: Boolean get() {
+            while (true) { // helper loop for atomic ops
+                val active = this._active
+                if (active !is OpDescriptor) return active != null
+                active.perform(this)
+            }
+        }
 
         companion object {
             @JvmStatic
-            val ACTIVE: AtomicIntegerFieldUpdater<NodeList> =
-                    AtomicIntegerFieldUpdater.newUpdater(NodeList::class.java, "active")
+            val ACTIVE: AtomicReferenceFieldUpdater<NodeList, Any?> =
+                    AtomicReferenceFieldUpdater.newUpdater(NodeList::class.java, Any::class.java, "_active")
+
+            @JvmStatic
+            val ACTIVE_STATE = Symbol("ACTIVE_STATE")
         }
 
         override fun toString(): String = buildString {
@@ -533,7 +596,7 @@
     }
 
     /**
-     * Class for a [state][getState] of a job that had completed exceptionally, including cancellation.
+     * Class for a [state] of a job that had completed exceptionally, including cancellation.
      *
      * @param cause the exceptional completion cause. If `cause` is null, then a [CancellationException]
      *        if created on first get from [exception] property.
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
index b8a58e8..a335f0e 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
@@ -63,17 +63,17 @@
     /**
      * Returns non-null closed token if it is first in the queue.
      */
-    protected val closedForReceive: Any? get() = queue.next() as? Closed<*>
+    protected val closedForReceive: Any? get() = queue.next as? Closed<*>
 
     /**
      * Returns non-null closed token if it is last in the queue.
      */
-    protected val closedForSend: Any? get() = queue.prev() as? Closed<*>
+    protected val closedForSend: Any? get() = queue.prev as? Closed<*>
 
     // ------ SendChannel ------
 
     public final override val isClosedForSend: Boolean get() = closedForSend != null
-    public final override val isFull: Boolean get() = queue.next() !is ReceiveOrClosed<*> && isBufferFull
+    public final override val isFull: Boolean get() = queue.next !is ReceiveOrClosed<*> && isBufferFull
 
     public final override suspend fun send(element: E) {
         // fast path -- try offer non-blocking
@@ -152,7 +152,7 @@
     // ------ ReceiveChannel ------
 
     public final override val isClosedForReceive: Boolean get() = closedForReceive != null && isBufferEmpty
-    public final override val isEmpty: Boolean get() = queue.next() !is Send && isBufferEmpty
+    public final override val isEmpty: Boolean get() = queue.next !is Send && isBufferEmpty
 
     @Suppress("UNCHECKED_CAST")
     public final override suspend fun receive(): E {
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Atomic.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Atomic.kt
new file mode 100644
index 0000000..5f5ace0
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Atomic.kt
@@ -0,0 +1,71 @@
+package kotlinx.coroutines.experimental.internal
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
+
+/**
+ * The most abstract operation that can be in process. Other threads observing an instance of this
+ * class in the fields of their object shall invoke [perform] to help.
+ *
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+public abstract class OpDescriptor {
+    /**
+     * Returns `null` is operation was performed successfully or some other
+     * object that indicates the failure reason.
+     */
+    abstract fun perform(affected: Any?): Any?
+}
+
+/**
+ * Descriptor for multi-word atomic operation.
+ * Based on paper
+ * ["A Practical Multi-Word Compare-and-Swap Operation"](http://www.cl.cam.ac.uk/research/srg/netos/papers/2002-casn.pdf)
+ * by Timothy L. Harris, Keir Fraser and Ian A. Pratt.
+ *
+ * Note: parts of atomic operation must be globally ordered. Otherwise, this implementation will produce
+ * [StackOverflowError].
+ *
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+public abstract class AtomicOp : OpDescriptor() {
+    @Volatile
+    private var _consensus: Any? = UNDECIDED
+
+    companion object {
+        @JvmStatic
+        private val CONSENSUS: AtomicReferenceFieldUpdater<AtomicOp, Any?> =
+            AtomicReferenceFieldUpdater.newUpdater(AtomicOp::class.java, Any::class.java, "_consensus")
+
+        private val UNDECIDED: Any = Symbol("UNDECIDED")
+    }
+
+    val isDecided: Boolean get() = _consensus !== UNDECIDED
+
+    abstract fun prepare(): Any? // `null` if Ok, or failure reason
+    abstract fun complete(affected: Any?, failure: Any?) // failure != null if failed to prepare op
+
+    // returns `null` on success
+    final override fun perform(affected: Any?): Any? {
+        // make decision on status
+        var decision: Any?
+        while (true) {
+            decision = this._consensus
+            if (decision !== UNDECIDED) break
+            decision = prepare()
+            check(decision !== UNDECIDED)
+            if (CONSENSUS.compareAndSet(this, UNDECIDED, decision)) break
+        }
+        complete(affected, decision)
+        return decision
+    }
+}
+
+/**
+ * A part of multi-step atomic operation [AtomicOp].
+ *
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+public abstract class AtomicDesc {
+    abstract fun prepare(op: AtomicOp): Any? // returns `null` if prepared successfully
+    abstract fun complete(op: AtomicOp, failure: Any?) // decision == null if success
+}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt
index f6f14e4..10f95ee 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt
@@ -16,7 +16,6 @@
 
 package kotlinx.coroutines.experimental.internal
 
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
 
 private typealias Node = LockFreeLinkedListNode
@@ -30,24 +29,45 @@
 @PublishedApi
 internal const val FAILURE = 2
 
+@PublishedApi
+internal val CONDITION_FALSE: Any = Symbol("CONDITION_FALSE")
+
+@PublishedApi
+internal val ALREADY_REMOVED: Any = Symbol("ALREADY_REMOVED")
+
+@PublishedApi
+internal val LIST_EMPTY: Any = Symbol("LIST_EMPTY")
+
+private val REMOVE_PREPARED: Any = Symbol("REMOVE_PREPARED")
+
+/**
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+public typealias RemoveFirstDesc<T> = LockFreeLinkedListNode.RemoveFirstDesc<T>
+
 /**
  * Doubly-linked concurrent list node with remove support.
  * Based on paper
  * ["Lock-Free and Practical Doubly Linked List-Based Deques Using Single-Word Compare-and-Swap"](http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.140.4693&rep=rep1&type=pdf)
  * by Sundell and Tsigas.
- * The instance of this class serves both as list head/tail sentinel and as the list item.
- * Sentinel node should be never removed.
+ *
+ * Important notes:
+ * * The instance of this class serves both as list head/tail sentinel and as the list item.
+ *   Sentinel node should be never removed.
+ * * There are no operations to add items to left side of the list, only to the end (right side), because we cannot
+ *   efficiently linearize them with atomic multi-step head-removal operations. In short,
+ *   support for [describeRemoveFirst] operation precludes ability to add items at the beginning.
  *
  * @suppress **This is unstable API and it is subject to change.**
  */
 @Suppress("LeakingThis")
 public open class LockFreeLinkedListNode {
     @Volatile
-    private var _next: Any = this // DoubleLinkedNode | Removed | CondAdd
+    private var _next: Any = this // DoubleLinkedNode | Removed | OpDescriptor
     @Volatile
-    private var prev: Any = this // DoubleLinkedNode | Removed
+    private var _prev: Any = this // DoubleLinkedNode | Removed
     @Volatile
-    private var removedRef: Removed? = null // lazily cached removed ref to this
+    private var _removedRef: Removed? = null // lazily cached removed ref to this
 
     private companion object {
         @JvmStatic
@@ -55,114 +75,70 @@
                 AtomicReferenceFieldUpdater.newUpdater(Node::class.java, Any::class.java, "_next")
         @JvmStatic
         val PREV: AtomicReferenceFieldUpdater<Node, Any> =
-                AtomicReferenceFieldUpdater.newUpdater(Node::class.java, Any::class.java, "prev")
+                AtomicReferenceFieldUpdater.newUpdater(Node::class.java, Any::class.java, "_prev")
         @JvmStatic
         val REMOVED_REF: AtomicReferenceFieldUpdater<Node, Removed?> =
-            AtomicReferenceFieldUpdater.newUpdater(Node::class.java, Removed::class.java, "removedRef")
-    }
-
-    private class Removed(val ref: Node) {
-        override fun toString(): String = "Removed[$ref]"
+            AtomicReferenceFieldUpdater.newUpdater(Node::class.java, Removed::class.java, "_removedRef")
     }
 
     private fun removed(): Removed =
-        removedRef ?: Removed(this).also { REMOVED_REF.lazySet(this, it) }
+        _removedRef ?: Removed(this).also { REMOVED_REF.lazySet(this, it) }
 
     @PublishedApi
-    internal abstract class CondAdd(val newNode: Node) {
+    internal abstract class CondAddOp(val newNode: Node) : AtomicOp() {
         lateinit var oldNext: Node
-        @Volatile
-        private var consensus: Int = UNDECIDED // status of operation
 
-        abstract fun isCondition(): Boolean
-
-        private companion object {
-            @JvmStatic
-            val CONSENSUS: AtomicIntegerFieldUpdater<CondAdd> =
-                    AtomicIntegerFieldUpdater.newUpdater(CondAdd::class.java, "consensus")
-        }
-
-        // returns either SUCCESS or FAILURE
-        fun completeAdd(node: Node): Int {
-            // make decision on status
-            var consensus: Int
-            while (true) {
-                consensus = this.consensus
-                if (consensus != UNDECIDED) break
-                val proposal = if (isCondition()) SUCCESS else FAILURE
-                if (CONSENSUS.compareAndSet(this, UNDECIDED, proposal)) {
-                    consensus = proposal
-                    break
-                }
-            }
-            val success = consensus == SUCCESS
-            if (NEXT.compareAndSet(node, this, if (success) newNode else oldNext)) {
+        override fun complete(affected: Any?, failure: Any?) {
+            affected as Node // type assertion
+            val success = failure == null
+            val update = if (success) newNode else oldNext
+            if (NEXT.compareAndSet(affected, this, update)) {
                 // only the thread the makes this update actually finishes add operation
                 if (success) newNode.finishAdd(oldNext)
             }
-            return consensus
         }
     }
 
     @PublishedApi
-    internal inline fun makeCondAdd(node: Node, crossinline condition: () -> Boolean): CondAdd = object : CondAdd(node) {
-        override fun isCondition(): Boolean = condition()
-    }
+    internal inline fun makeCondAddOp(node: Node, crossinline condition: () -> Boolean): CondAddOp =
+        object : CondAddOp(node) {
+            override fun prepare(): Any? = if (condition()) null else CONDITION_FALSE
+        }
 
-    public val isRemoved: Boolean get() = _next is Removed
+    public val isRemoved: Boolean get() = next is Removed
 
-    private val isFresh: Boolean get() = _next === this && prev === this
-
-    @PublishedApi
-    internal val next: Any get() {
-        while (true) { // helper loop on _next
+    // LINEARIZABLE.
+    public val next: Any get() {
+        while (true) { // operation helper loop on _next
             val next = this._next
-            if (next !is CondAdd) return next
-            next.completeAdd(this)
+            if (next !is OpDescriptor) return next
+            next.perform(this)
         }
     }
 
-    public fun next(): Node = next.unwrap()
-
-    public fun prev(): Node {
+    // LINEARIZABLE. Note: use it on sentinel (never removed) node only
+    public val prev: Node get() {
         while (true) {
-            prevHelper()?.let { return it.unwrap() }
+            val prev = this._prev as Node // this sentinel node is never removed
+            if (prev.next === this) return prev
+            helpInsert(prev)
         }
     }
 
-    // ------ addFirstXXX ------
+    // ------ addOneIfEmpty ------
 
-    /**
-     * Adds first item to this list.
-     */
-    public fun addFirst(node: Node) {
-        while (true) { // lock-free loop on next
-            val next = this.next as Node // this sentinel node is never removed
-            if (addNext(node, next)) return
-        }
-    }
-
-    /**
-     * Adds first item to this list atomically if the [condition] is true.
-     */
-    public inline fun addFirstIf(node: Node, crossinline condition: () -> Boolean): Boolean {
-        val condAdd = makeCondAdd(node, condition)
-        while (true) { // lock-free loop on next
-            val next = this.next as Node // this sentinel node is never removed
-            when (tryCondAddNext(node, next, condAdd)) {
-                SUCCESS -> return true
-                FAILURE -> return false
-            }
-        }
-    }
-
-    public fun addFirstIfEmpty(node: Node): Boolean {
+    public fun addOneIfEmpty(node: Node): Boolean {
         PREV.lazySet(node, this)
         NEXT.lazySet(node, this)
-        if (!NEXT.compareAndSet(this, this, node)) return false // this is not an empty list!
-        // added successfully (linearized add) -- fixup the list
-        node.finishAdd(this)
-        return true
+        while (true) {
+            val next = next
+            if (next !== this) return false // this is not an empty list!
+            if (NEXT.compareAndSet(this, this, node)) {
+                // added successfully (linearized add) -- fixup the list
+                node.finishAdd(this)
+                return true
+            }
+        }
     }
 
     // ------ addLastXXX ------
@@ -172,7 +148,7 @@
      */
     public fun addLast(node: Node) {
         while (true) { // lock-free loop on prev.next
-            val prev = prevHelper() ?: continue
+            val prev = prev
             if (prev.addNext(node, this)) return
         }
     }
@@ -181,9 +157,9 @@
      * Adds last item to this list atomically if the [condition] is true.
      */
     public inline fun addLastIf(node: Node, crossinline condition: () -> Boolean): Boolean {
-        val condAdd = makeCondAdd(node, condition)
+        val condAdd = makeCondAddOp(node, condition)
         while (true) { // lock-free loop on prev.next
-            val prev = prevHelper() ?: continue
+            val prev = prev
             when (prev.tryCondAddNext(node, this, condAdd)) {
                 SUCCESS -> return true
                 FAILURE -> return false
@@ -193,7 +169,7 @@
 
     public inline fun addLastIfPrev(node: Node, predicate: (Node) -> Boolean): Boolean {
         while (true) { // lock-free loop on prev.next
-            val prev = prevHelper() ?: continue
+            val prev = prev
             if (!predicate(prev)) return false
             if (prev.addNext(node, this)) return true
         }
@@ -204,9 +180,9 @@
             predicate: (Node) -> Boolean, // prev node predicate
             crossinline condition: () -> Boolean // atomically checked condition
     ): Boolean {
-        val condAdd = makeCondAdd(node, condition)
+        val condAdd = makeCondAddOp(node, condition)
         while (true) { // lock-free loop on prev.next
-            val prev = prevHelper() ?: continue
+            val prev = prev
             if (!predicate(prev)) return false
             when (prev.tryCondAddNext(node, this, condAdd)) {
                 SUCCESS -> return true
@@ -215,14 +191,6 @@
         }
     }
 
-    @PublishedApi
-    internal fun prevHelper(): Node? {
-        val prev = this.prev as Node // this sentinel node is never removed
-        if (prev.next === this) return prev
-        helpInsert(prev)
-        return null
-    }
-
     // ------ addXXX util ------
 
     @PublishedApi
@@ -237,13 +205,13 @@
 
     // returns UNDECIDED, SUCCESS or FAILURE
     @PublishedApi
-    internal fun tryCondAddNext(node: Node, next: Node, condAdd: CondAdd): Int {
+    internal fun tryCondAddNext(node: Node, next: Node, condAdd: CondAddOp): Int {
         PREV.lazySet(node, this)
         NEXT.lazySet(node, next)
         condAdd.oldNext = next
         if (!NEXT.compareAndSet(this, next, condAdd)) return UNDECIDED
         // added operation successfully (linearized) -- complete it & fixup the list
-        return condAdd.completeAdd(this)
+        return if (condAdd.perform(this) == null) SUCCESS else FAILURE
     }
 
     // ------ removeXXX ------
@@ -255,40 +223,165 @@
         while (true) { // lock-free loop on next
             val next = this.next
             if (next is Removed) return false // was already removed -- don't try to help (original thread will take care)
+            check(next !== this) // sanity check -- can be true for sentinel nodes only, but they are never removed
             if (NEXT.compareAndSet(this, next, (next as Node).removed())) {
                 // was removed successfully (linearized remove) -- fixup the list
-                helpDelete()
-                next.helpInsert(prev.unwrap())
+                finishRemove(next)
                 return true
             }
         }
     }
 
-    public fun removeFirstOrNull(): Node? {
-        while (true) { // try to linearize
-            val first = next()
-            if (first == this) return null
-            if (first.remove()) return first
+    public open fun describeRemove() : AtomicDesc? {
+        if (isRemoved) return null // fast path if was already removed
+        return object : AbstractAtomicDesc() {
+            override val affectedNode: Node? get() = this@LockFreeLinkedListNode
+            override var originalNext: Node? = null
+            override fun failure(affected: Node, next: Any): Any? =
+                if (next is Removed) ALREADY_REMOVED else null
+            override fun onPrepare(affected: Node, next: Node): Boolean {
+                originalNext = next
+                return true
+            }
+            override fun updatedNext(next: Node) = next.removed()
+            override fun finishOnSuccess(affected: Node, next: Node) = finishRemove(next)
         }
     }
 
+    public fun removeFirstOrNull(): Node? {
+        while (true) { // try to linearize
+            val first = next as Node
+            if (first === this) return null
+            if (first.remove()) return first
+            first.helpDelete() // must help delete, or loose lock-freedom
+        }
+    }
+
+    public fun describeRemoveFirst(): RemoveFirstDesc<Node> = RemoveFirstDesc(this)
+
     public inline fun <reified T> removeFirstIfIsInstanceOf(): T? {
         while (true) { // try to linearize
-            val first = next()
-            if (first == this) return null
+            val first = next as Node
+            if (first === this) return null
             if (first !is T) return null
             if (first.remove()) return first
+            first.helpDelete() // must help delete, or loose lock-freedom
         }
     }
 
     // just peek at item when predicate is true
     public inline fun <reified T> removeFirstIfIsInstanceOfOrPeekIf(predicate: (T) -> Boolean): T? {
         while (true) { // try to linearize
-            val first = next()
-            if (first == this) return null
+            val first = next as Node
+            if (first === this) return null
             if (first !is T) return null
             if (predicate(first)) return first // just peek when predicate is true
             if (first.remove()) return first
+            first.helpDelete() // must help delete, or loose lock-freedom
+        }
+    }
+
+    // ------ multi-word atomic operations helpers ------
+
+    public open class RemoveFirstDesc<T>(val queue: Node) : AbstractAtomicDesc() {
+        @Suppress("UNCHECKED_CAST")
+        public val result: T get() = affectedNode!! as T
+
+        final override fun takeAffectedNode(): Node = queue.next as Node
+        final override var affectedNode: Node? = null
+        final override var originalNext: Node? = null
+
+        // check node predicates here, must signal failure if affect is not of type T
+        protected override fun failure(affected: Node, next: Any): Any? =
+                if (affected === queue) LIST_EMPTY else null
+        // validate the resulting node (return false if it should be deleted)
+        protected open fun validatePrepared(node: T): Boolean = true // false means remove node & retry
+
+        final override fun retry(affected: Node, next: Any): Boolean {
+            if (next !is Removed) return false
+            affected.helpDelete() // must help delete, or loose lock-freedom
+            return true
+        }
+        @Suppress("UNCHECKED_CAST")
+        final override fun onPrepare(affected: Node, next: Node): Boolean {
+            check(affected !is LockFreeLinkedListHead)
+            if (!validatePrepared(affected as T)) return false
+            affectedNode = affected
+            originalNext = next
+            return true
+        }
+        final override fun updatedNext(next: Node): Any = next.removed()
+        final override fun finishOnSuccess(affected: Node, next: Node) = affected.finishRemove(next)
+    }
+
+    public abstract class AbstractAtomicDesc : AtomicDesc() {
+        protected abstract val affectedNode: Node?
+        protected abstract val originalNext: Node?
+        protected open fun takeAffectedNode(): Node = affectedNode!!
+        protected open fun failure(affected: Node, next: Any): Any? = null // next: Node | Removed
+        protected open fun retry(affected: Node, next: Any): Boolean = false // next: Node | Removed
+        protected abstract fun onPrepare(affected: Node, next: Node): Boolean // false means: remove node & retry
+        protected abstract fun updatedNext(next: Node): Any
+        protected abstract fun finishOnSuccess(affected: Node, next: Node)
+
+        // This is Harris's RDCSS (Restricted Double-Compare Single Swap) operation
+        // It inserts "op" descriptor of when "op" status is still undecided (rolls back otherwise)
+        private class PrepareOp(
+            val next: Node,
+            val op: AtomicOp,
+            val desc: AbstractAtomicDesc
+        ) : OpDescriptor() {
+            override fun perform(affected: Any?): Any? {
+                affected as Node // type assertion
+                if (!desc.onPrepare(affected, next)) return REMOVE_PREPARED
+                check(desc.affectedNode === affected)
+                check(desc.originalNext === next)
+                val update: Any = if (op.isDecided) next else op // restore if decision was already reached
+                NEXT.compareAndSet(affected, this, update)
+                return null // ok
+            }
+        }
+
+        final override fun prepare(op: AtomicOp): Any? {
+            while (true) { // lock free loop on next
+                val affected = takeAffectedNode()
+                // read its original next pointer first
+                val next = affected._next
+                // then see if already reached consensus on overall operation
+                if (op.isDecided) return null // already decided -- go to next desc
+                if (next === op) return null // already in process of operation -- all is good
+                if (next is OpDescriptor) {
+                    // some other operation is in process -- help it
+                    next.perform(affected)
+                    continue // and retry
+                }
+                // next: Node | Removed
+                val failure = failure(affected, next)
+                if (failure != null) return failure // signal failure
+                if (retry(affected, next)) continue // retry operation
+                val prepareOp = PrepareOp(next as Node, op, this)
+                if (NEXT.compareAndSet(affected, next, prepareOp)) {
+                    // prepared -- complete preparations
+                    val prepFail = prepareOp.perform(affected) ?: return null // prepared successfully
+                    check(prepFail === REMOVE_PREPARED) // the only way for prepare to fail
+                    if (NEXT.compareAndSet(affected, prepareOp, next.removed())) {
+                        affected.helpDelete()
+                    }
+                }
+            }
+        }
+
+        final override fun complete(op: AtomicOp, failure: Any?) {
+            val success = failure == null
+            val update = if (success) updatedNext(originalNext!!) else originalNext
+            val affectedNode = affectedNode
+            if (affectedNode == null) {
+                check(!success)
+                return
+            }
+            if (NEXT.compareAndSet(affectedNode, op, update)) {
+                if (success) finishOnSuccess(affectedNode, originalNext!!)
+            }
         }
     }
 
@@ -296,7 +389,7 @@
 
     private fun finishAdd(next: Node) {
         while (true) {
-            val nextPrev = next.prev
+            val nextPrev = next._prev
             if (nextPrev is Removed || this.next !== next) return // next was removed, remover fixes up links
             if (PREV.compareAndSet(next, nextPrev, this)) {
                 if (this.next is Removed) {
@@ -308,16 +401,22 @@
         }
     }
 
+    private fun finishRemove(next: Node) {
+        helpDelete()
+        next.helpInsert(_prev.unwrap())
+    }
+
     private fun markPrev(): Node {
         while (true) { // lock-free loop on prev
-            val prev = this.prev
+            val prev = this._prev
             if (prev is Removed) return prev.ref
             if (PREV.compareAndSet(this, prev, (prev as Node).removed())) return prev
         }
     }
 
     // fixes next links to the left of this node
-    private fun helpDelete() {
+    @PublishedApi
+    internal fun helpDelete() {
         var last: Node? = null // will set to the node left of prev when found
         var prev: Node = markPrev()
         var next: Node = (this._next as Removed).ref
@@ -338,7 +437,7 @@
                     prev = last
                     last = null
                 } else {
-                    prev = prev.prev.unwrap()
+                    prev = prev._prev.unwrap()
                 }
                 continue
             }
@@ -368,11 +467,11 @@
                     prev = last
                     last = null
                 } else {
-                    prev = prev.prev.unwrap()
+                    prev = prev._prev.unwrap()
                 }
                 continue
             }
-            val oldPrev = this.prev
+            val oldPrev = this._prev
             if (oldPrev is Removed) return // this node was removed, too -- its remover will take care
             if (prevNext !== this) {
                 // need to fixup next
@@ -382,50 +481,56 @@
             }
             if (oldPrev === prev) return // it is already linked as needed
             if (PREV.compareAndSet(this, oldPrev, prev)) {
-                if (prev.prev !is Removed) return // finish only if prev was not concurrently removed
+                if (prev._prev !is Removed) return // finish only if prev was not concurrently removed
             }
         }
     }
 
-    private fun Any.unwrap(): Node = if (this is Removed) ref else this as Node
-
     internal fun validateNode(prev: Node, next: Node) {
-        check(prev === this.prev)
-        check(next === this.next)
+        check(prev === this._prev)
+        check(next === this._next)
     }
 }
 
+private class Removed(val ref: Node) {
+    override fun toString(): String = "Removed[$ref]"
+}
+
+@PublishedApi
+internal fun Any.unwrap(): Node = if (this is Removed) ref else this as Node
+
 /**
  * Head (sentinel) item of the linked list that is never removed.
  *
  * @suppress **This is unstable API and it is subject to change.**
  */
 public open class LockFreeLinkedListHead : LockFreeLinkedListNode() {
-    public val isEmpty: Boolean get() = next() == this
+    public val isEmpty: Boolean get() = next === this
 
     /**
      * Iterates over all elements in this list of a specified type.
      */
     public inline fun <reified T : Node> forEach(block: (T) -> Unit) {
-        var cur: Node = next()
+        var cur: Node = next as Node
         while (cur != this) {
             if (cur is T) block(cur)
-            cur = cur.next()
+            cur = cur.next.unwrap()
         }
     }
 
     // just a defensive programming -- makes sure that list head sentinel is never removed
     public final override fun remove() = throw UnsupportedOperationException()
+    public final override fun describeRemove(): AtomicDesc? = throw UnsupportedOperationException()
 
     internal fun validate() {
         var prev: Node = this
-        var cur: Node = next()
+        var cur: Node = next as Node
         while (cur != this) {
-            val next = cur.next()
+            val next = cur.next.unwrap()
             cur.validateNode(prev, next)
             prev = cur
             cur = next
         }
-        validateNode(prev, next())
+        validateNode(prev, next as Node)
     }
 }
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Symbol.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Symbol.kt
new file mode 100644
index 0000000..d34f865
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Symbol.kt
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.internal
+
+internal class Symbol(val symbol: String) {
+    override fun toString(): String = symbol
+}