ArrayChannel implementation and tests
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
index 4daf5f1..1b1adad 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
@@ -22,13 +22,16 @@
     val isCancelled: Boolean
 
     /**
-     * Tries to resume this continuation with a given value and returns `true` if it was successful,
-     * or `false` otherwise (it was already resumed or cancelled).
-     *
-     * An optional [onSuccess] callback is invoked with [value] as its parameter after the state of this continuation
-     * is updated (so that is cannot be cancelled anymore), but before it is actually resumed.
+     * Tries to resume this continuation with a given value and returns non-null object token if it was successful,
+     * or `null` otherwise (it was already resumed or cancelled). When non-null object was returned,
+     * [completeResume] must be invoked with it.
      */
-    fun tryResume(value: T, onSuccess: ((Any?) -> Unit)? = null): Boolean
+    fun tryResume(value: T): Any?
+
+    /**
+     * Completes the execution of [tryResume] on its non-null result.
+     */
+    fun completeResume(token: Any)
 
     /**
      * Makes this continuation cancellable. Use it with `holdCancellability` optional parameter to
@@ -104,16 +107,20 @@
     override val isCancelled: Boolean
         get() = getState() is Cancelled
 
-    override fun tryResume(value: T, onSuccess: ((Any?) -> Unit)?): Boolean {
+    override fun tryResume(value: T): Any? {
         while (true) { // lock-free loop on state
             val state = getState() // atomic read
             when (state) {
-                is Active -> if (updateState(state, value, onSuccess)) return true
-                else -> return false // cannot resume -- not active anymore
+                is Active -> if (tryUpdateState(state, value)) return state
+                else -> return null // cannot resume -- not active anymore
             }
         }
     }
 
+    override fun completeResume(token: Any) {
+        completeUpdateState(token, getState())
+    }
+
     @Suppress("UNCHECKED_CAST")
     override fun afterCompletion(state: Any?) {
         val decision = this.decision // volatile read
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
index d34edc8..9147ede 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
@@ -213,16 +213,24 @@
     /**
      * Tries to update current [state][getState] of this job.
      */
-    fun updateState(expect: Any, update: Any?, onSuccess: ((Any?) -> Unit)? = null): Boolean {
+    fun updateState(expect: Any, update: Any?): Boolean {
+        if (!tryUpdateState(expect, update)) return false
+        completeUpdateState(expect, update)
+        return true
+    }
+
+    fun tryUpdateState(expect: Any, update: Any?): Boolean  {
         require(expect is Active && update !is Active) // only active -> inactive transition is allowed
         if (!STATE.compareAndSet(this, expect, update)) return false
         // #1. Update linked state before invoking completion handlers
         onStateUpdate(update)
         // #2. Unregister from parent job
         registration?.unregister() // volatile read registration _after_ state was updated
-        // #3. Additional (optional) callback
-        onSuccess?.invoke(update)
-        // #4. Invoke completion handlers
+        return true // continues in completeUpdateState
+    }
+
+    fun completeUpdateState(expect: Any, update: Any?) {
+        // #3. Invoke completion handlers
         val reason = (update as? CompletedExceptionally)?.cancelReason
         var completionException: Throwable? = null
         when (expect) {
@@ -244,10 +252,9 @@
             // otherwise -- do nothing (Empty)
             else -> check(expect == Empty)
         }
-        // #5. Do other (overridable) processing after completion handlers
+        // #4. Do other (overridable) processing after completion handlers
         completionException?.let { handleCompletionException(it) }
         afterCompletion(update)
-        return true
     }
 
     final override val isActive: Boolean get() = state is Active
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
new file mode 100644
index 0000000..7cd231f
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
@@ -0,0 +1,317 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.CancellableContinuation
+import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
+import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
+import kotlinx.coroutines.experimental.removeOnCompletion
+import kotlinx.coroutines.experimental.suspendCancellableCoroutine
+
+/**
+ * Abstract channel. It is a base class for buffered and unbuffered channels.
+ */
+public abstract class AbstractChannel<E> : Channel<E> {
+    private val queue = LockFreeLinkedListHead()
+
+    // ------ extension points for buffered channels ------
+
+    protected abstract val hasBuffer: Boolean
+    protected abstract val isBufferEmpty: Boolean
+    protected abstract val isBufferFull: Boolean
+
+    /**
+     * Tries to add element to buffer or to queued receiver.
+     * Return type is `OFFER_SUCCESS | OFFER_FAILED | OFFER_CLOSED`.
+     */
+    protected abstract fun offerInternal(element: E): Int
+
+    /**
+     * Tries to remove element from buffer or from queued sender.
+     * Return type is `E | POLL_EMPTY | POLL_CLOSED`
+     */
+    protected abstract fun pollInternal(): Any?
+
+
+    // ------ state function for concrete implementations ------
+
+    protected val isClosedTokenFirstInQueue: Boolean get() = queue.next() is Closed<*>
+
+    // ------ SendChannel ------
+
+    override val isClosedForSend: Boolean get() = queue.prev() is Closed<*>
+    override val isFull: Boolean get() = queue.next() !is ReceiveOrClosed<*> && isBufferFull
+
+    suspend override fun send(element: E) {
+        // fast path -- try offer non-blocking
+        if (offer(element)) return
+        // slow-path does suspend
+        return sendSuspend(element)
+    }
+
+    override fun offer(element: E): Boolean =
+        when (offerInternal(element)) {
+            OFFER_SUCCESS -> true
+            OFFER_FAILED -> false
+            else -> throw ClosedSendChannelException()
+        }
+
+    private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutine(true) sc@ { cont ->
+        val send = SendElement(cont, element)
+        loop@ while (true) {
+            if (enqueueSend(send)) {
+                cont.initCancellability() // make it properly cancellable
+                cont.removeOnCompletion(send)
+                return@sc
+            }
+            // hm... something is not right. try to offer
+            when (offerInternal(element)) {
+                OFFER_SUCCESS -> {
+                    cont.resume(Unit)
+                    return@sc
+                }
+                OFFER_CLOSED -> {
+                    cont.resumeWithException(ClosedSendChannelException())
+                    return@sc
+                }
+            }
+        }
+    }
+
+    private fun enqueueSend(send: SendElement) =
+        if (hasBuffer)
+            queue.addLastIfPrevAndIf(send, { it !is ReceiveOrClosed<*> }, { isBufferFull })
+        else
+            queue.addLastIfPrev(send, { it !is ReceiveOrClosed<*> })
+
+    override fun close() {
+        while (true) {
+            val receive = takeFirstReceiveOrPeekClosed()
+            if (receive == null) {
+                // queue empty or has only senders -- try add last "Closed" item to the queue
+                if (queue.addLastIfPrev(Closed<E>(), { it !is ReceiveOrClosed<*> })) return
+                continue // retry on failure
+            }
+            if (receive is Closed<*>) return // already marked as closed -- nothing to do
+            receive as Receive<E> // type assertion
+            receive.resumeReceiveClosed()
+        }
+    }
+
+    protected fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
+        queue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed<E>> { it is Closed<*> }
+
+    // ------ ReceiveChannel ------
+
+    override val isClosedForReceive: Boolean get() = isClosedTokenFirstInQueue && isBufferEmpty
+    override val isEmpty: Boolean get() = queue.next() !is Send && isBufferEmpty
+
+    @Suppress("UNCHECKED_CAST")
+    suspend override fun receive(): E {
+        // fast path -- try poll non-blocking
+        val result = pollInternal()
+        if (result === POLL_CLOSED) throw ClosedReceiveChannelException()
+        if (result !== POLL_EMPTY) return result as E
+        // slow-path does suspend
+        return receiveSuspend()
+    }
+
+    @Suppress("UNCHECKED_CAST")
+    private suspend fun receiveSuspend(): E = suspendCancellableCoroutine(true) sc@ { cont ->
+        val receive = ReceiveNonNull(cont)
+        while (true) {
+            if (enqueueReceive(receive)) {
+                cont.initCancellability() // make it properly cancellable
+                cont.removeOnCompletion(receive)
+                return@sc
+            }
+            // hm... something is not right. try to poll
+            val result = pollInternal()
+            if (result === POLL_CLOSED) {
+                cont.resumeWithException(ClosedReceiveChannelException())
+                return@sc
+            }
+            if (result !== POLL_EMPTY) {
+                cont.resume(result as E)
+                return@sc
+            }
+        }
+    }
+
+    private fun enqueueReceive(receive: Receive<E>) =
+        if (hasBuffer)
+            queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty })
+        else
+            queue.addLastIfPrev(receive, { it !is Send })
+
+    @Suppress("UNCHECKED_CAST")
+    suspend override fun receiveOrNull(): E? {
+        // fast path -- try poll non-blocking
+        val result = pollInternal()
+        if (result === POLL_CLOSED) return null
+        if (result !== POLL_EMPTY) return result as E
+        // slow-path does suspend
+        return receiveOrNullSuspend()
+    }
+
+    @Suppress("UNCHECKED_CAST")
+    private suspend fun receiveOrNullSuspend(): E? = suspendCancellableCoroutine(true) sc@ { cont ->
+        val receive = ReceiveOrNull(cont)
+        while (true) {
+            if (enqueueReceive(receive)) {
+                cont.initCancellability() // make it properly cancellable
+                cont.removeOnCompletion(receive)
+                return@sc
+            }
+            // hm... something is not right. try to poll
+            val result = pollInternal()
+            if (result === POLL_CLOSED) {
+                cont.resume(null)
+                return@sc
+            }
+            if (result !== POLL_EMPTY) {
+                cont.resume(result as E)
+                return@sc
+            }
+        }
+    }
+
+    @Suppress("UNCHECKED_CAST")
+    override fun poll(): E? {
+        val result = pollInternal()
+        return if (result === POLL_EMPTY || result === POLL_CLOSED) null else result as E
+    }
+
+    override fun iterator(): ChannelIterator<E> = Iterator(this)
+
+    protected fun takeFirstSendOrPeekClosed(): Send? =
+        queue.removeFirstIfIsInstanceOfOrPeekIf<Send> { it is Closed<*> }
+
+    protected companion object {
+        const val OFFER_SUCCESS = 0
+        const val OFFER_FAILED = 1
+        const val OFFER_CLOSED = 2
+
+        val POLL_EMPTY: Any = Marker("POLL_EMPTY")
+        val POLL_CLOSED: Any = Marker("POLL_CLOSED")
+    }
+
+    // for debugging
+    private class Marker(val string: String) {
+        override fun toString(): String = string
+    }
+
+    private class Iterator<E>(val channel: AbstractChannel<E>) : ChannelIterator<E> {
+        var result: Any? = POLL_EMPTY // E | POLL_CLOSED | POLL_EMPTY
+
+        suspend override fun hasNext(): Boolean {
+            // check for repeated hasNext
+            if (result !== POLL_EMPTY) return result !== POLL_CLOSED
+            // fast path -- try poll non-blocking
+            result = channel.pollInternal()
+            if (result !== POLL_EMPTY) return result !== POLL_CLOSED
+            // slow-path does suspend
+            return hasNextSuspend()
+        }
+
+        private suspend fun hasNextSuspend(): Boolean = suspendCancellableCoroutine(true) sc@ { cont ->
+            val receive = ReceiveHasNext(this, cont)
+            while (true) {
+                if (channel.enqueueReceive(receive)) {
+                    cont.initCancellability() // make it properly cancellable
+                    cont.removeOnCompletion(receive)
+                    return@sc
+                }
+                // hm... something is not right. try to poll
+                result = channel.pollInternal()
+                if (result === POLL_CLOSED) {
+                    cont.resume(false)
+                    return@sc
+                }
+                if (result !== POLL_EMPTY) {
+                    cont.resume(true)
+                    return@sc
+                }
+            }
+        }
+
+        @Suppress("UNCHECKED_CAST")
+        suspend override fun next(): E {
+            if (result === POLL_CLOSED) throw ClosedReceiveChannelException()
+            if (result !== POLL_EMPTY) {
+                val value = this.result as E
+                this.result = POLL_EMPTY
+                return value
+            }
+            // rare case when hasNext was not invoked yet -- just delegate to receive (leave state as is)
+            return channel.receive()
+        }
+    }
+
+    protected interface Send {
+        val pollResult: Any? // E | POLL_CLOSED
+        fun tryResumeSend(): Any?
+        fun completeResumeSend(token: Any)
+    }
+
+    protected interface ReceiveOrClosed<in E> {
+        val offerResult: Int // OFFER_SUCCESS | OFFER_CLOSED
+        fun tryResumeReceive(value: E): Any?
+        fun completeResumeReceive(token: Any)
+    }
+
+    @Suppress("UNCHECKED_CAST")
+    private class SendElement(
+            val cont: CancellableContinuation<Unit>,
+            override val pollResult: Any?
+    ) : LockFreeLinkedListNode(), Send {
+        override fun tryResumeSend(): Any? = cont.tryResume(Unit)
+        override fun completeResumeSend(token: Any) = cont.completeResume(token)
+    }
+
+    private class Closed<in E> : LockFreeLinkedListNode(), Send, ReceiveOrClosed<E> {
+        override val offerResult get() = OFFER_CLOSED
+        override val pollResult get() = POLL_CLOSED
+        override fun tryResumeSend(): Boolean = true
+        override fun completeResumeSend(token: Any) {}
+        override fun tryResumeReceive(value: E): Any? = throw ClosedSendChannelException()
+        override fun completeResumeReceive(token: Any) = throw ClosedSendChannelException()
+    }
+
+    private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
+        override val offerResult get() = OFFER_SUCCESS
+        abstract fun resumeReceiveClosed()
+    }
+
+    private class ReceiveNonNull<in E>(val cont: CancellableContinuation<E>) : Receive<E>() {
+        override fun tryResumeReceive(value: E): Any? = cont.tryResume(value)
+        override fun completeResumeReceive(token: Any) = cont.completeResume(token)
+        override fun resumeReceiveClosed() = cont.resumeWithException(ClosedReceiveChannelException())
+    }
+
+    private class ReceiveOrNull<in E>(val cont: CancellableContinuation<E?>) : Receive<E>() {
+        override fun tryResumeReceive(value: E): Any? = cont.tryResume(value)
+        override fun completeResumeReceive(token: Any) = cont.completeResume(token)
+        override fun resumeReceiveClosed() = cont.resume(null)
+    }
+
+    private class ReceiveHasNext<E>(
+            val iterator: Iterator<E>,
+            val cont: CancellableContinuation<Boolean>
+    ) : Receive<E>() {
+        override fun tryResumeReceive(value: E): Any? {
+            val token = cont.tryResume(true)
+            if (token != null) iterator.result = value
+            return token
+        }
+
+        override fun completeResumeReceive(token: Any) = cont.completeResume(token)
+
+        override fun resumeReceiveClosed() {
+            val token = cont.tryResume(false)
+            if (token != null) {
+                iterator.result = POLL_CLOSED
+                cont.completeResume(token)
+            }
+        }
+    }
+}
+
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
new file mode 100644
index 0000000..cff2894
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
@@ -0,0 +1,99 @@
+package kotlinx.coroutines.experimental.channels
+
+import java.util.concurrent.locks.ReentrantLock
+
+/**
+ * Channel with array buffer of a fixed [capacity].
+ * Sender suspends only when buffer is fully and receiver suspends only when buffer is empty.
+ *
+ * This implementation uses lock to protect the buffer, which is held only during very short buffer-update operations.
+ * The lists of suspended senders or receivers are lock-free.
+ */
+public class ArrayChannel<E>(val capacity: Int) : AbstractChannel<E>() {
+    init {
+        check(capacity >= 1) { "ArrayChannel capacity must be at least 1, but $capacity was specified" }
+    }
+
+    private val lock = ReentrantLock()
+    private val buffer: Array<Any?> = arrayOfNulls<Any?>(capacity)
+    private var head: Int = 0
+    @Volatile
+    private var size: Int = 0
+
+    private inline fun <T> locked(block: () -> T): T {
+        lock.lock()
+        return try { block() }
+        finally { lock.unlock() }
+    }
+
+    override val hasBuffer: Boolean get() = true
+    override val isBufferEmpty: Boolean get() = size == 0
+    override val isBufferFull: Boolean get() = size == capacity
+
+    override fun offerInternal(element: E): Int {
+        var token: Any? = null
+        var receive: ReceiveOrClosed<E>? = null
+        locked {
+            val size = this.size
+            if (isClosedForSend) return OFFER_CLOSED
+            if (size < capacity) {
+                // tentatively put element to buffer
+                this.size = size + 1 // update size before checking queue (!!!)
+                // check for receivers that were waiting on empty queue
+                if (size == 0) {
+                    while (true) {
+                        receive = takeFirstReceiveOrPeekClosed() ?: break // break when no receivers queued
+                        token = receive!!.tryResumeReceive(element)
+                        if (token != null) {
+                            this.size = size // restore size
+                            return@locked
+                        }
+                    }
+                }
+                buffer[(head + size) % capacity] = element // actually queue element
+                return OFFER_SUCCESS
+            }
+            // size == capacity: full
+            return OFFER_FAILED
+        }
+        // breaks here if offer meets receiver
+        receive!!.completeResumeReceive(token!!)
+        return receive!!.offerResult
+    }
+
+    // result is `E | POLL_EMPTY | POLL_CLOSED`
+    override fun pollInternal(): Any? {
+        var token: Any? = null
+        var send: Send? = null
+        var result: Any? = null
+        locked {
+            val size = this.size
+            if (size == 0) return if (isClosedTokenFirstInQueue) POLL_CLOSED else POLL_EMPTY
+            // size > 0: not empty -- retrieve element
+            result = buffer[head]
+            buffer[head] = null
+            this.size = size - 1 // update size before checking queue (!!!)
+            // check for senders that were waiting on full queue
+            var replacement: Any? = POLL_EMPTY
+            if (size == capacity) {
+                while (true) {
+                    send = takeFirstSendOrPeekClosed() ?: break
+                    token = send!!.tryResumeSend()
+                    if (token != null) {
+                        replacement = send!!.pollResult
+                        break
+                    }
+                }
+            }
+            if (replacement !== POLL_EMPTY && replacement !== POLL_CLOSED) {
+                this.size = size // restore size
+                buffer[(head + size) % capacity] = replacement
+            }
+            head = (head + 1) % capacity
+        }
+        // complete send the we're taken replacement from
+        if (token != null)
+            send!!.completeResumeSend(token!!)
+        return result
+    }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
index ef96c21..fae4060 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
@@ -102,7 +102,7 @@
      * Retrieves and removes the head of this queue, or returns `null` if this queue [isEmpty]
      * or [isClosedForReceive].
      */
-    public fun pool(): E?
+    public fun poll(): E?
 
     /**
      * Returns new iterator to receive elements from this channels using `for` loop.
@@ -153,7 +153,23 @@
  * Conceptually, a channel is similar to [BlockingQueue][java.util.concurrent.BlockingQueue],
  * but it has suspending operations instead of blocking ones and it can be closed.
  */
-public interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
+public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
+    /**
+     * Factory for channels.
+     */
+    public companion object Factory {
+        /**
+         * Creates a channel with specified buffer capacity (or without a buffer by default).
+         */
+        public operator fun <E> invoke(capacity: Int = 0): Channel<E> {
+            check(capacity >= 0) { "Channel capacity cannot be negative, but $capacity was specified" }
+            return if (capacity == 0)
+                RendezvousChannel()
+            else
+                ArrayChannel(capacity)
+        }
+    }
+}
 
 /**
  * Indicates attempt to [send][SendChannel.send] on [isClosedForSend][SendChannel.isClosedForSend] channel.
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
index 57095cc..645228e 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
@@ -1,283 +1,39 @@
 package kotlinx.coroutines.experimental.channels
 
-import kotlinx.coroutines.experimental.CancellableContinuation
-import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
-import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
-import kotlinx.coroutines.experimental.removeOnCompletion
-import kotlinx.coroutines.experimental.suspendCancellableCoroutine
-
 /**
  * Rendezvous channel. This channel does not have any buffer at all. An element is transferred from sender
  * to receiver only when [send] and [receive] invocations meet in time (rendezvous), so [send] suspends
  * until another coroutine invokes [receive] and [receive] suspends until another coroutine invokes [send].
+ *
+ * This implementation is fully lock-free.
  */
-public class RendezvousChannel<E> : Channel<E> {
-    private val queue = LockFreeLinkedListHead()
+public class RendezvousChannel<E> : AbstractChannel<E>() {
 
-    // ------ SendChannel ------
+    override val hasBuffer: Boolean get() = false
+    override val isBufferEmpty: Boolean get() = true
+    override val isBufferFull: Boolean get() = true
 
-    override val isClosedForSend: Boolean get() = queue.prev() is Closed<*>
-
-    override val isFull: Boolean get() = queue.next() !is ReceiveOrClosed<*>
-
-    suspend override fun send(element: E) {
-        // fast path if receive is already waiting for rendezvous
-        while (true) { // loop while there are receive waiters
-            val receive = takeFirstReceiveOrPeekClosed() ?: break
-            if (receive.tryResumeReceiveFromSend(element)) return // resumed it successfully
-        }
-        // slow-path does suspend
-        return suspendCancellableCoroutine(true) sc@ { cont ->
-            while (true) {
-                val send = SendElement(cont, element) // must allocate fresh element on each loop
-                if (queue.addLastIfPrev(send) { it !is Receive<*>  && it !is Closed<*> }) {
-                    cont.initCancellability() // make it properly cancellable
-                    cont.removeOnCompletion(send)
-                    return@sc
-                }
-                // hm... there are already receivers (maybe), so try taking first
-                takeFirstReceiveOrPeekClosed()?.also { receive ->
-                    if (receive.tryResumeReceiveFromSend(element, cont)) return@sc
-                }
-            }
-        }
-    }
-
-    override fun offer(element: E): Boolean {
-        takeFirstReceiveOrPeekClosed()?.apply {
-            tryResumeReceiveFromSend(element)
-            return true
-        }
-        return false
-    }
-
-    override fun close() {
+    // result is `OFFER_SUCCESS | OFFER_FAILED | OFFER_CLOSED`
+    override fun offerInternal(element: E): Int {
         while (true) {
-            val receive = takeFirstReceiveOrPeekClosed()
-            if (receive == null) {
-                // queue empty or has only senders -- try add last "Closed" item to the queue
-                if (queue.addLastIfPrev(Closed<E>()) { it !is Closed<*> }) return
-            }
-            if (receive is Closed<*>) return // already marked as closed -- nothing to do
-            receive as Receive<E> // type assertion
-            receive.resumeReceiveClosed()
-        }
-    }
-
-    private fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
-        queue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed<E>> { it is Closed<*> }
-
-    // ------ ReceiveChannel ------
-
-    override val isClosedForReceive: Boolean get() = queue.next() is Closed<*>
-
-    override val isEmpty: Boolean get() = queue.next() !is Send<*>
-
-    suspend override fun receive(): E {
-        // fast path if send is already waiting for rendezvous or closed
-        while (true) { // loop while there are send waiters
-            val send = takeFirstSendOrPeekClosed() ?: break
-            if (send.tryResumeSend()) return send.element // resumed it successfully
-        }
-        // slow-path does suspend
-        return suspendCancellableCoroutine(true) sc@ { cont ->
-            while (true) {
-                val receive = ReceiveNonNull(cont) // must allocate fresh element on each loop
-                if (queue.addLastIfPrev(receive) { it !is Send<*> }) {
-                    cont.initCancellability() // make it properly cancellable
-                    cont.removeOnCompletion(receive)
-                    return@sc
-                }
-                // hm... there are already senders (maybe), so try taking first
-                takeFirstSendOrPeekClosed()?.also { send ->
-                    if (send.tryResumeSend()) {
-                        send.resumeWithElement(cont)
-                        return@sc
-                    }
-                }
+            val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED
+            val token = receive.tryResumeReceive(element)
+            if (token != null) {
+                receive.completeResumeReceive(token)
+                return receive.offerResult
             }
         }
     }
 
-    suspend override fun receiveOrNull(): E? {
-        // fast path if send is already waiting for rendezvous or closed
-        while (true) { // loop while there are send waiters
-            val send = takeFirstSendOrPeekClosed() ?: break
-            if (send.tryResumeSend()) return send.elementOrNull // resumed it successfully
-        }
-        // slow-path does suspend
-        return suspendCancellableCoroutine(true) sc@ { cont ->
-            while (true) {
-                val receive = ReceiveOrNull(cont) // must allocate fresh element on each loop
-                if (queue.addLastIfPrev(receive) { it !is Send<*> }) {
-                    cont.initCancellability() // make it properly cancellable
-                    cont.removeOnCompletion(receive)
-                    return@sc
-                }
-                // hm... there are already senders (maybe), so try taking first
-                takeFirstSendOrPeekClosed()?.also { send ->
-                    if (send.tryResumeSend()) {
-                        send.resumeWithElementOrNull(cont)
-                        return@sc
-                    }
-                }
-            }
-        }
-    }
-
-    override fun pool(): E? {
+    // result is `E | POLL_EMPTY | POLL_CLOSED`
+    override fun pollInternal(): Any? {
         while (true) {
-            val waiter = takeFirstSendOrPeekClosed() ?: return null
-            if (waiter.tryResumeSend()) return waiter.element
-        }
-    }
-
-    override fun iterator(): ChannelIterator<E> = Iterator(this)
-
-    private fun takeFirstSendOrPeekClosed(): Send<E>? =
-        queue.removeFirstIfIsInstanceOfOrPeekIf<Send<E>> { it is Closed<*> }
-
-    private companion object {
-        val IS_UNKNOWN = 0
-        val IS_HAS_VALUE = 1
-        val IS_CLOSED = 2
-    }
-
-    private class Iterator<E>(val channel: RendezvousChannel<E>) : ChannelIterator<E> {
-        var state: Int = IS_UNKNOWN
-        var value: E? = null
-
-        suspend override fun hasNext(): Boolean {
-            when (state) {
-                IS_HAS_VALUE -> return true
-                IS_CLOSED -> return false
+            val send = takeFirstSendOrPeekClosed() ?: return POLL_EMPTY
+            val token = send.tryResumeSend()
+            if (token != null) {
+                send.completeResumeSend(token)
+                return send.pollResult
             }
-            // fast path if send is already waiting for rendezvous
-            while (true) { // loop while there are send waiters
-                val send = channel.takeFirstSendOrPeekClosed() ?: break
-                if (send.tryResumeSend()) return updateStateWithSend(send)
-            }
-            // slow-path does suspend
-            return suspendCancellableCoroutine(true) sc@ { cont ->
-                while (true) {
-                    val receive = ReceiveHasNext(this, cont) // must allocate fresh element on each loop
-                    if (channel.queue.addLastIfPrev(receive) { it !is Send<*> }) {
-                        cont.initCancellability() // make it properly cancellable
-                        cont.removeOnCompletion(receive)
-                        return@sc
-                    }
-                    // hm... there are already senders (maybe), so try taking first
-                    channel.takeFirstSendOrPeekClosed()?.also { send ->
-                        if (send.tryResumeSend()) {
-                            cont.resume(updateStateWithSend(send))
-                            return@sc
-                        }
-                    }
-                }
-            }
-        }
-
-        private fun updateStateWithSend(send: Send<E>): Boolean {
-            if (send is Closed<*>) {
-                state = IS_CLOSED
-                return false
-            } else {
-                state = IS_HAS_VALUE
-                value = send.element
-                return true
-            }
-        }
-
-        suspend override fun next(): E {
-            when (state) {
-                IS_HAS_VALUE -> {
-                    val value = this.value as E
-                    this.state = IS_UNKNOWN
-                    this.value = null
-                    return value
-                }
-                IS_CLOSED -> throw ClosedReceiveChannelException()
-            }
-            // rare case when hasNext was not invoked yet -- just delegate to receive (leave state as IS_UNKNOWN)
-            return channel.receive()
-        }
-    }
-
-    private abstract class Send<out E> : LockFreeLinkedListNode() {
-        abstract val element: E
-        abstract val elementOrNull: E?
-        abstract fun tryResumeSend(): Boolean
-        abstract fun resumeWithElement(cont: CancellableContinuation<E>)
-        abstract fun resumeWithElementOrNull(cont: CancellableContinuation<E?>)
-    }
-
-    private class SendElement<out E>(
-            val cont: CancellableContinuation<Unit>,
-            override val element: E
-    ) : Send<E>() {
-        override val elementOrNull: E? get() = element
-        override fun tryResumeSend(): Boolean = cont.tryResume(Unit)
-        override fun resumeWithElement(cont: CancellableContinuation<E>) = cont.resume(element)
-        override fun resumeWithElementOrNull(cont: CancellableContinuation<E?>) = cont.resume(element)
-    }
-
-    private interface ReceiveOrClosed<in E> {
-        fun tryResumeReceiveFromSend(value: E): Boolean
-        fun tryResumeReceiveFromSend(value: E, cont: CancellableContinuation<Unit>): Boolean
-    }
-
-    private class Closed<E> : Send<E>(), ReceiveOrClosed<E> {
-        override val element: E get() = throw ClosedReceiveChannelException()
-        override val elementOrNull: E? get() = null
-        override fun tryResumeSend(): Boolean = true
-        override fun resumeWithElement(cont: CancellableContinuation<E>) = cont.resumeWithException(ClosedReceiveChannelException())
-        override fun resumeWithElementOrNull(cont: CancellableContinuation<E?>) = cont.resume(null)
-        override fun tryResumeReceiveFromSend(value: E): Boolean = throw ClosedSendChannelException()
-        override fun tryResumeReceiveFromSend(value: E, cont: CancellableContinuation<Unit>): Boolean {
-            cont.resumeWithException(ClosedSendChannelException())
-            return true
-        }
-    }
-
-    private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
-        override fun tryResumeReceiveFromSend(value: E, cont: CancellableContinuation<Unit>): Boolean {
-            if (tryResumeReceiveFromSend(value)) {
-                cont.resume(Unit)
-                return true
-            }
-            return false
-        }
-        abstract fun resumeReceiveClosed()
-    }
-
-    private class ReceiveNonNull<in E>(val cont: CancellableContinuation<E>) : Receive<E>() {
-        override fun tryResumeReceiveFromSend(value: E): Boolean = cont.tryResume(value)
-        override fun resumeReceiveClosed() = cont.resumeWithException(ClosedReceiveChannelException())
-    }
-
-    private class ReceiveOrNull<in E>(val cont: CancellableContinuation<E?>) : Receive<E>() {
-        override fun tryResumeReceiveFromSend(value: E): Boolean = cont.tryResume(value)
-        override fun resumeReceiveClosed() = cont.resume(null)
-    }
-
-    private class ReceiveHasNext<E>(
-            val iterator: Iterator<E>,
-            val cont: CancellableContinuation<Boolean>
-    ) : Receive<E>(), (Any?) -> Unit {
-        override fun tryResumeReceiveFromSend(value: E): Boolean {
-            iterator.value = value // tentative value (may fail to resume with it)
-            return cont.tryResume(true, this)
-        }
-
-        override fun resumeReceiveClosed() {
-            cont.tryResume(false, this)
-        }
-
-        // callback for tryResume onSuccess
-        override fun invoke(hasNext: Any?) {
-            hasNext as Boolean // try assertion -- that is the value we pass
-            iterator.state = if (hasNext) IS_HAS_VALUE else IS_CLOSED
-            if (!hasNext) iterator.value = null // cleanup tentative value
         }
     }
 }