ArrayChannel implementation and tests
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
index 4daf5f1..1b1adad 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
@@ -22,13 +22,16 @@
     val isCancelled: Boolean
 
     /**
-     * Tries to resume this continuation with a given value and returns `true` if it was successful,
-     * or `false` otherwise (it was already resumed or cancelled).
-     *
-     * An optional [onSuccess] callback is invoked with [value] as its parameter after the state of this continuation
-     * is updated (so that is cannot be cancelled anymore), but before it is actually resumed.
+     * Tries to resume this continuation with a given value and returns non-null object token if it was successful,
+     * or `null` otherwise (it was already resumed or cancelled). When non-null object was returned,
+     * [completeResume] must be invoked with it.
      */
-    fun tryResume(value: T, onSuccess: ((Any?) -> Unit)? = null): Boolean
+    fun tryResume(value: T): Any?
+
+    /**
+     * Completes the execution of [tryResume] on its non-null result.
+     */
+    fun completeResume(token: Any)
 
     /**
      * Makes this continuation cancellable. Use it with `holdCancellability` optional parameter to
@@ -104,16 +107,20 @@
     override val isCancelled: Boolean
         get() = getState() is Cancelled
 
-    override fun tryResume(value: T, onSuccess: ((Any?) -> Unit)?): Boolean {
+    override fun tryResume(value: T): Any? {
         while (true) { // lock-free loop on state
             val state = getState() // atomic read
             when (state) {
-                is Active -> if (updateState(state, value, onSuccess)) return true
-                else -> return false // cannot resume -- not active anymore
+                is Active -> if (tryUpdateState(state, value)) return state
+                else -> return null // cannot resume -- not active anymore
             }
         }
     }
 
+    override fun completeResume(token: Any) {
+        completeUpdateState(token, getState())
+    }
+
     @Suppress("UNCHECKED_CAST")
     override fun afterCompletion(state: Any?) {
         val decision = this.decision // volatile read
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
index d34edc8..9147ede 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
@@ -213,16 +213,24 @@
     /**
      * Tries to update current [state][getState] of this job.
      */
-    fun updateState(expect: Any, update: Any?, onSuccess: ((Any?) -> Unit)? = null): Boolean {
+    fun updateState(expect: Any, update: Any?): Boolean {
+        if (!tryUpdateState(expect, update)) return false
+        completeUpdateState(expect, update)
+        return true
+    }
+
+    fun tryUpdateState(expect: Any, update: Any?): Boolean  {
         require(expect is Active && update !is Active) // only active -> inactive transition is allowed
         if (!STATE.compareAndSet(this, expect, update)) return false
         // #1. Update linked state before invoking completion handlers
         onStateUpdate(update)
         // #2. Unregister from parent job
         registration?.unregister() // volatile read registration _after_ state was updated
-        // #3. Additional (optional) callback
-        onSuccess?.invoke(update)
-        // #4. Invoke completion handlers
+        return true // continues in completeUpdateState
+    }
+
+    fun completeUpdateState(expect: Any, update: Any?) {
+        // #3. Invoke completion handlers
         val reason = (update as? CompletedExceptionally)?.cancelReason
         var completionException: Throwable? = null
         when (expect) {
@@ -244,10 +252,9 @@
             // otherwise -- do nothing (Empty)
             else -> check(expect == Empty)
         }
-        // #5. Do other (overridable) processing after completion handlers
+        // #4. Do other (overridable) processing after completion handlers
         completionException?.let { handleCompletionException(it) }
         afterCompletion(update)
-        return true
     }
 
     final override val isActive: Boolean get() = state is Active
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
new file mode 100644
index 0000000..7cd231f
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
@@ -0,0 +1,317 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.CancellableContinuation
+import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
+import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
+import kotlinx.coroutines.experimental.removeOnCompletion
+import kotlinx.coroutines.experimental.suspendCancellableCoroutine
+
+/**
+ * Abstract channel. It is a base class for buffered and unbuffered channels.
+ */
+public abstract class AbstractChannel<E> : Channel<E> {
+    private val queue = LockFreeLinkedListHead()
+
+    // ------ extension points for buffered channels ------
+
+    protected abstract val hasBuffer: Boolean
+    protected abstract val isBufferEmpty: Boolean
+    protected abstract val isBufferFull: Boolean
+
+    /**
+     * Tries to add element to buffer or to queued receiver.
+     * Return type is `OFFER_SUCCESS | OFFER_FAILED | OFFER_CLOSED`.
+     */
+    protected abstract fun offerInternal(element: E): Int
+
+    /**
+     * Tries to remove element from buffer or from queued sender.
+     * Return type is `E | POLL_EMPTY | POLL_CLOSED`
+     */
+    protected abstract fun pollInternal(): Any?
+
+
+    // ------ state function for concrete implementations ------
+
+    protected val isClosedTokenFirstInQueue: Boolean get() = queue.next() is Closed<*>
+
+    // ------ SendChannel ------
+
+    override val isClosedForSend: Boolean get() = queue.prev() is Closed<*>
+    override val isFull: Boolean get() = queue.next() !is ReceiveOrClosed<*> && isBufferFull
+
+    suspend override fun send(element: E) {
+        // fast path -- try offer non-blocking
+        if (offer(element)) return
+        // slow-path does suspend
+        return sendSuspend(element)
+    }
+
+    override fun offer(element: E): Boolean =
+        when (offerInternal(element)) {
+            OFFER_SUCCESS -> true
+            OFFER_FAILED -> false
+            else -> throw ClosedSendChannelException()
+        }
+
+    private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutine(true) sc@ { cont ->
+        val send = SendElement(cont, element)
+        loop@ while (true) {
+            if (enqueueSend(send)) {
+                cont.initCancellability() // make it properly cancellable
+                cont.removeOnCompletion(send)
+                return@sc
+            }
+            // hm... something is not right. try to offer
+            when (offerInternal(element)) {
+                OFFER_SUCCESS -> {
+                    cont.resume(Unit)
+                    return@sc
+                }
+                OFFER_CLOSED -> {
+                    cont.resumeWithException(ClosedSendChannelException())
+                    return@sc
+                }
+            }
+        }
+    }
+
+    private fun enqueueSend(send: SendElement) =
+        if (hasBuffer)
+            queue.addLastIfPrevAndIf(send, { it !is ReceiveOrClosed<*> }, { isBufferFull })
+        else
+            queue.addLastIfPrev(send, { it !is ReceiveOrClosed<*> })
+
+    override fun close() {
+        while (true) {
+            val receive = takeFirstReceiveOrPeekClosed()
+            if (receive == null) {
+                // queue empty or has only senders -- try add last "Closed" item to the queue
+                if (queue.addLastIfPrev(Closed<E>(), { it !is ReceiveOrClosed<*> })) return
+                continue // retry on failure
+            }
+            if (receive is Closed<*>) return // already marked as closed -- nothing to do
+            receive as Receive<E> // type assertion
+            receive.resumeReceiveClosed()
+        }
+    }
+
+    protected fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
+        queue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed<E>> { it is Closed<*> }
+
+    // ------ ReceiveChannel ------
+
+    override val isClosedForReceive: Boolean get() = isClosedTokenFirstInQueue && isBufferEmpty
+    override val isEmpty: Boolean get() = queue.next() !is Send && isBufferEmpty
+
+    @Suppress("UNCHECKED_CAST")
+    suspend override fun receive(): E {
+        // fast path -- try poll non-blocking
+        val result = pollInternal()
+        if (result === POLL_CLOSED) throw ClosedReceiveChannelException()
+        if (result !== POLL_EMPTY) return result as E
+        // slow-path does suspend
+        return receiveSuspend()
+    }
+
+    @Suppress("UNCHECKED_CAST")
+    private suspend fun receiveSuspend(): E = suspendCancellableCoroutine(true) sc@ { cont ->
+        val receive = ReceiveNonNull(cont)
+        while (true) {
+            if (enqueueReceive(receive)) {
+                cont.initCancellability() // make it properly cancellable
+                cont.removeOnCompletion(receive)
+                return@sc
+            }
+            // hm... something is not right. try to poll
+            val result = pollInternal()
+            if (result === POLL_CLOSED) {
+                cont.resumeWithException(ClosedReceiveChannelException())
+                return@sc
+            }
+            if (result !== POLL_EMPTY) {
+                cont.resume(result as E)
+                return@sc
+            }
+        }
+    }
+
+    private fun enqueueReceive(receive: Receive<E>) =
+        if (hasBuffer)
+            queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty })
+        else
+            queue.addLastIfPrev(receive, { it !is Send })
+
+    @Suppress("UNCHECKED_CAST")
+    suspend override fun receiveOrNull(): E? {
+        // fast path -- try poll non-blocking
+        val result = pollInternal()
+        if (result === POLL_CLOSED) return null
+        if (result !== POLL_EMPTY) return result as E
+        // slow-path does suspend
+        return receiveOrNullSuspend()
+    }
+
+    @Suppress("UNCHECKED_CAST")
+    private suspend fun receiveOrNullSuspend(): E? = suspendCancellableCoroutine(true) sc@ { cont ->
+        val receive = ReceiveOrNull(cont)
+        while (true) {
+            if (enqueueReceive(receive)) {
+                cont.initCancellability() // make it properly cancellable
+                cont.removeOnCompletion(receive)
+                return@sc
+            }
+            // hm... something is not right. try to poll
+            val result = pollInternal()
+            if (result === POLL_CLOSED) {
+                cont.resume(null)
+                return@sc
+            }
+            if (result !== POLL_EMPTY) {
+                cont.resume(result as E)
+                return@sc
+            }
+        }
+    }
+
+    @Suppress("UNCHECKED_CAST")
+    override fun poll(): E? {
+        val result = pollInternal()
+        return if (result === POLL_EMPTY || result === POLL_CLOSED) null else result as E
+    }
+
+    override fun iterator(): ChannelIterator<E> = Iterator(this)
+
+    protected fun takeFirstSendOrPeekClosed(): Send? =
+        queue.removeFirstIfIsInstanceOfOrPeekIf<Send> { it is Closed<*> }
+
+    protected companion object {
+        const val OFFER_SUCCESS = 0
+        const val OFFER_FAILED = 1
+        const val OFFER_CLOSED = 2
+
+        val POLL_EMPTY: Any = Marker("POLL_EMPTY")
+        val POLL_CLOSED: Any = Marker("POLL_CLOSED")
+    }
+
+    // for debugging
+    private class Marker(val string: String) {
+        override fun toString(): String = string
+    }
+
+    private class Iterator<E>(val channel: AbstractChannel<E>) : ChannelIterator<E> {
+        var result: Any? = POLL_EMPTY // E | POLL_CLOSED | POLL_EMPTY
+
+        suspend override fun hasNext(): Boolean {
+            // check for repeated hasNext
+            if (result !== POLL_EMPTY) return result !== POLL_CLOSED
+            // fast path -- try poll non-blocking
+            result = channel.pollInternal()
+            if (result !== POLL_EMPTY) return result !== POLL_CLOSED
+            // slow-path does suspend
+            return hasNextSuspend()
+        }
+
+        private suspend fun hasNextSuspend(): Boolean = suspendCancellableCoroutine(true) sc@ { cont ->
+            val receive = ReceiveHasNext(this, cont)
+            while (true) {
+                if (channel.enqueueReceive(receive)) {
+                    cont.initCancellability() // make it properly cancellable
+                    cont.removeOnCompletion(receive)
+                    return@sc
+                }
+                // hm... something is not right. try to poll
+                result = channel.pollInternal()
+                if (result === POLL_CLOSED) {
+                    cont.resume(false)
+                    return@sc
+                }
+                if (result !== POLL_EMPTY) {
+                    cont.resume(true)
+                    return@sc
+                }
+            }
+        }
+
+        @Suppress("UNCHECKED_CAST")
+        suspend override fun next(): E {
+            if (result === POLL_CLOSED) throw ClosedReceiveChannelException()
+            if (result !== POLL_EMPTY) {
+                val value = this.result as E
+                this.result = POLL_EMPTY
+                return value
+            }
+            // rare case when hasNext was not invoked yet -- just delegate to receive (leave state as is)
+            return channel.receive()
+        }
+    }
+
+    protected interface Send {
+        val pollResult: Any? // E | POLL_CLOSED
+        fun tryResumeSend(): Any?
+        fun completeResumeSend(token: Any)
+    }
+
+    protected interface ReceiveOrClosed<in E> {
+        val offerResult: Int // OFFER_SUCCESS | OFFER_CLOSED
+        fun tryResumeReceive(value: E): Any?
+        fun completeResumeReceive(token: Any)
+    }
+
+    @Suppress("UNCHECKED_CAST")
+    private class SendElement(
+            val cont: CancellableContinuation<Unit>,
+            override val pollResult: Any?
+    ) : LockFreeLinkedListNode(), Send {
+        override fun tryResumeSend(): Any? = cont.tryResume(Unit)
+        override fun completeResumeSend(token: Any) = cont.completeResume(token)
+    }
+
+    private class Closed<in E> : LockFreeLinkedListNode(), Send, ReceiveOrClosed<E> {
+        override val offerResult get() = OFFER_CLOSED
+        override val pollResult get() = POLL_CLOSED
+        override fun tryResumeSend(): Boolean = true
+        override fun completeResumeSend(token: Any) {}
+        override fun tryResumeReceive(value: E): Any? = throw ClosedSendChannelException()
+        override fun completeResumeReceive(token: Any) = throw ClosedSendChannelException()
+    }
+
+    private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
+        override val offerResult get() = OFFER_SUCCESS
+        abstract fun resumeReceiveClosed()
+    }
+
+    private class ReceiveNonNull<in E>(val cont: CancellableContinuation<E>) : Receive<E>() {
+        override fun tryResumeReceive(value: E): Any? = cont.tryResume(value)
+        override fun completeResumeReceive(token: Any) = cont.completeResume(token)
+        override fun resumeReceiveClosed() = cont.resumeWithException(ClosedReceiveChannelException())
+    }
+
+    private class ReceiveOrNull<in E>(val cont: CancellableContinuation<E?>) : Receive<E>() {
+        override fun tryResumeReceive(value: E): Any? = cont.tryResume(value)
+        override fun completeResumeReceive(token: Any) = cont.completeResume(token)
+        override fun resumeReceiveClosed() = cont.resume(null)
+    }
+
+    private class ReceiveHasNext<E>(
+            val iterator: Iterator<E>,
+            val cont: CancellableContinuation<Boolean>
+    ) : Receive<E>() {
+        override fun tryResumeReceive(value: E): Any? {
+            val token = cont.tryResume(true)
+            if (token != null) iterator.result = value
+            return token
+        }
+
+        override fun completeResumeReceive(token: Any) = cont.completeResume(token)
+
+        override fun resumeReceiveClosed() {
+            val token = cont.tryResume(false)
+            if (token != null) {
+                iterator.result = POLL_CLOSED
+                cont.completeResume(token)
+            }
+        }
+    }
+}
+
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
new file mode 100644
index 0000000..cff2894
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
@@ -0,0 +1,99 @@
+package kotlinx.coroutines.experimental.channels
+
+import java.util.concurrent.locks.ReentrantLock
+
+/**
+ * Channel with array buffer of a fixed [capacity].
+ * Sender suspends only when buffer is fully and receiver suspends only when buffer is empty.
+ *
+ * This implementation uses lock to protect the buffer, which is held only during very short buffer-update operations.
+ * The lists of suspended senders or receivers are lock-free.
+ */
+public class ArrayChannel<E>(val capacity: Int) : AbstractChannel<E>() {
+    init {
+        check(capacity >= 1) { "ArrayChannel capacity must be at least 1, but $capacity was specified" }
+    }
+
+    private val lock = ReentrantLock()
+    private val buffer: Array<Any?> = arrayOfNulls<Any?>(capacity)
+    private var head: Int = 0
+    @Volatile
+    private var size: Int = 0
+
+    private inline fun <T> locked(block: () -> T): T {
+        lock.lock()
+        return try { block() }
+        finally { lock.unlock() }
+    }
+
+    override val hasBuffer: Boolean get() = true
+    override val isBufferEmpty: Boolean get() = size == 0
+    override val isBufferFull: Boolean get() = size == capacity
+
+    override fun offerInternal(element: E): Int {
+        var token: Any? = null
+        var receive: ReceiveOrClosed<E>? = null
+        locked {
+            val size = this.size
+            if (isClosedForSend) return OFFER_CLOSED
+            if (size < capacity) {
+                // tentatively put element to buffer
+                this.size = size + 1 // update size before checking queue (!!!)
+                // check for receivers that were waiting on empty queue
+                if (size == 0) {
+                    while (true) {
+                        receive = takeFirstReceiveOrPeekClosed() ?: break // break when no receivers queued
+                        token = receive!!.tryResumeReceive(element)
+                        if (token != null) {
+                            this.size = size // restore size
+                            return@locked
+                        }
+                    }
+                }
+                buffer[(head + size) % capacity] = element // actually queue element
+                return OFFER_SUCCESS
+            }
+            // size == capacity: full
+            return OFFER_FAILED
+        }
+        // breaks here if offer meets receiver
+        receive!!.completeResumeReceive(token!!)
+        return receive!!.offerResult
+    }
+
+    // result is `E | POLL_EMPTY | POLL_CLOSED`
+    override fun pollInternal(): Any? {
+        var token: Any? = null
+        var send: Send? = null
+        var result: Any? = null
+        locked {
+            val size = this.size
+            if (size == 0) return if (isClosedTokenFirstInQueue) POLL_CLOSED else POLL_EMPTY
+            // size > 0: not empty -- retrieve element
+            result = buffer[head]
+            buffer[head] = null
+            this.size = size - 1 // update size before checking queue (!!!)
+            // check for senders that were waiting on full queue
+            var replacement: Any? = POLL_EMPTY
+            if (size == capacity) {
+                while (true) {
+                    send = takeFirstSendOrPeekClosed() ?: break
+                    token = send!!.tryResumeSend()
+                    if (token != null) {
+                        replacement = send!!.pollResult
+                        break
+                    }
+                }
+            }
+            if (replacement !== POLL_EMPTY && replacement !== POLL_CLOSED) {
+                this.size = size // restore size
+                buffer[(head + size) % capacity] = replacement
+            }
+            head = (head + 1) % capacity
+        }
+        // complete send the we're taken replacement from
+        if (token != null)
+            send!!.completeResumeSend(token!!)
+        return result
+    }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
index ef96c21..fae4060 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
@@ -102,7 +102,7 @@
      * Retrieves and removes the head of this queue, or returns `null` if this queue [isEmpty]
      * or [isClosedForReceive].
      */
-    public fun pool(): E?
+    public fun poll(): E?
 
     /**
      * Returns new iterator to receive elements from this channels using `for` loop.
@@ -153,7 +153,23 @@
  * Conceptually, a channel is similar to [BlockingQueue][java.util.concurrent.BlockingQueue],
  * but it has suspending operations instead of blocking ones and it can be closed.
  */
-public interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
+public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
+    /**
+     * Factory for channels.
+     */
+    public companion object Factory {
+        /**
+         * Creates a channel with specified buffer capacity (or without a buffer by default).
+         */
+        public operator fun <E> invoke(capacity: Int = 0): Channel<E> {
+            check(capacity >= 0) { "Channel capacity cannot be negative, but $capacity was specified" }
+            return if (capacity == 0)
+                RendezvousChannel()
+            else
+                ArrayChannel(capacity)
+        }
+    }
+}
 
 /**
  * Indicates attempt to [send][SendChannel.send] on [isClosedForSend][SendChannel.isClosedForSend] channel.
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
index 57095cc..645228e 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
@@ -1,283 +1,39 @@
 package kotlinx.coroutines.experimental.channels
 
-import kotlinx.coroutines.experimental.CancellableContinuation
-import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
-import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
-import kotlinx.coroutines.experimental.removeOnCompletion
-import kotlinx.coroutines.experimental.suspendCancellableCoroutine
-
 /**
  * Rendezvous channel. This channel does not have any buffer at all. An element is transferred from sender
  * to receiver only when [send] and [receive] invocations meet in time (rendezvous), so [send] suspends
  * until another coroutine invokes [receive] and [receive] suspends until another coroutine invokes [send].
+ *
+ * This implementation is fully lock-free.
  */
-public class RendezvousChannel<E> : Channel<E> {
-    private val queue = LockFreeLinkedListHead()
+public class RendezvousChannel<E> : AbstractChannel<E>() {
 
-    // ------ SendChannel ------
+    override val hasBuffer: Boolean get() = false
+    override val isBufferEmpty: Boolean get() = true
+    override val isBufferFull: Boolean get() = true
 
-    override val isClosedForSend: Boolean get() = queue.prev() is Closed<*>
-
-    override val isFull: Boolean get() = queue.next() !is ReceiveOrClosed<*>
-
-    suspend override fun send(element: E) {
-        // fast path if receive is already waiting for rendezvous
-        while (true) { // loop while there are receive waiters
-            val receive = takeFirstReceiveOrPeekClosed() ?: break
-            if (receive.tryResumeReceiveFromSend(element)) return // resumed it successfully
-        }
-        // slow-path does suspend
-        return suspendCancellableCoroutine(true) sc@ { cont ->
-            while (true) {
-                val send = SendElement(cont, element) // must allocate fresh element on each loop
-                if (queue.addLastIfPrev(send) { it !is Receive<*>  && it !is Closed<*> }) {
-                    cont.initCancellability() // make it properly cancellable
-                    cont.removeOnCompletion(send)
-                    return@sc
-                }
-                // hm... there are already receivers (maybe), so try taking first
-                takeFirstReceiveOrPeekClosed()?.also { receive ->
-                    if (receive.tryResumeReceiveFromSend(element, cont)) return@sc
-                }
-            }
-        }
-    }
-
-    override fun offer(element: E): Boolean {
-        takeFirstReceiveOrPeekClosed()?.apply {
-            tryResumeReceiveFromSend(element)
-            return true
-        }
-        return false
-    }
-
-    override fun close() {
+    // result is `OFFER_SUCCESS | OFFER_FAILED | OFFER_CLOSED`
+    override fun offerInternal(element: E): Int {
         while (true) {
-            val receive = takeFirstReceiveOrPeekClosed()
-            if (receive == null) {
-                // queue empty or has only senders -- try add last "Closed" item to the queue
-                if (queue.addLastIfPrev(Closed<E>()) { it !is Closed<*> }) return
-            }
-            if (receive is Closed<*>) return // already marked as closed -- nothing to do
-            receive as Receive<E> // type assertion
-            receive.resumeReceiveClosed()
-        }
-    }
-
-    private fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
-        queue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed<E>> { it is Closed<*> }
-
-    // ------ ReceiveChannel ------
-
-    override val isClosedForReceive: Boolean get() = queue.next() is Closed<*>
-
-    override val isEmpty: Boolean get() = queue.next() !is Send<*>
-
-    suspend override fun receive(): E {
-        // fast path if send is already waiting for rendezvous or closed
-        while (true) { // loop while there are send waiters
-            val send = takeFirstSendOrPeekClosed() ?: break
-            if (send.tryResumeSend()) return send.element // resumed it successfully
-        }
-        // slow-path does suspend
-        return suspendCancellableCoroutine(true) sc@ { cont ->
-            while (true) {
-                val receive = ReceiveNonNull(cont) // must allocate fresh element on each loop
-                if (queue.addLastIfPrev(receive) { it !is Send<*> }) {
-                    cont.initCancellability() // make it properly cancellable
-                    cont.removeOnCompletion(receive)
-                    return@sc
-                }
-                // hm... there are already senders (maybe), so try taking first
-                takeFirstSendOrPeekClosed()?.also { send ->
-                    if (send.tryResumeSend()) {
-                        send.resumeWithElement(cont)
-                        return@sc
-                    }
-                }
+            val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED
+            val token = receive.tryResumeReceive(element)
+            if (token != null) {
+                receive.completeResumeReceive(token)
+                return receive.offerResult
             }
         }
     }
 
-    suspend override fun receiveOrNull(): E? {
-        // fast path if send is already waiting for rendezvous or closed
-        while (true) { // loop while there are send waiters
-            val send = takeFirstSendOrPeekClosed() ?: break
-            if (send.tryResumeSend()) return send.elementOrNull // resumed it successfully
-        }
-        // slow-path does suspend
-        return suspendCancellableCoroutine(true) sc@ { cont ->
-            while (true) {
-                val receive = ReceiveOrNull(cont) // must allocate fresh element on each loop
-                if (queue.addLastIfPrev(receive) { it !is Send<*> }) {
-                    cont.initCancellability() // make it properly cancellable
-                    cont.removeOnCompletion(receive)
-                    return@sc
-                }
-                // hm... there are already senders (maybe), so try taking first
-                takeFirstSendOrPeekClosed()?.also { send ->
-                    if (send.tryResumeSend()) {
-                        send.resumeWithElementOrNull(cont)
-                        return@sc
-                    }
-                }
-            }
-        }
-    }
-
-    override fun pool(): E? {
+    // result is `E | POLL_EMPTY | POLL_CLOSED`
+    override fun pollInternal(): Any? {
         while (true) {
-            val waiter = takeFirstSendOrPeekClosed() ?: return null
-            if (waiter.tryResumeSend()) return waiter.element
-        }
-    }
-
-    override fun iterator(): ChannelIterator<E> = Iterator(this)
-
-    private fun takeFirstSendOrPeekClosed(): Send<E>? =
-        queue.removeFirstIfIsInstanceOfOrPeekIf<Send<E>> { it is Closed<*> }
-
-    private companion object {
-        val IS_UNKNOWN = 0
-        val IS_HAS_VALUE = 1
-        val IS_CLOSED = 2
-    }
-
-    private class Iterator<E>(val channel: RendezvousChannel<E>) : ChannelIterator<E> {
-        var state: Int = IS_UNKNOWN
-        var value: E? = null
-
-        suspend override fun hasNext(): Boolean {
-            when (state) {
-                IS_HAS_VALUE -> return true
-                IS_CLOSED -> return false
+            val send = takeFirstSendOrPeekClosed() ?: return POLL_EMPTY
+            val token = send.tryResumeSend()
+            if (token != null) {
+                send.completeResumeSend(token)
+                return send.pollResult
             }
-            // fast path if send is already waiting for rendezvous
-            while (true) { // loop while there are send waiters
-                val send = channel.takeFirstSendOrPeekClosed() ?: break
-                if (send.tryResumeSend()) return updateStateWithSend(send)
-            }
-            // slow-path does suspend
-            return suspendCancellableCoroutine(true) sc@ { cont ->
-                while (true) {
-                    val receive = ReceiveHasNext(this, cont) // must allocate fresh element on each loop
-                    if (channel.queue.addLastIfPrev(receive) { it !is Send<*> }) {
-                        cont.initCancellability() // make it properly cancellable
-                        cont.removeOnCompletion(receive)
-                        return@sc
-                    }
-                    // hm... there are already senders (maybe), so try taking first
-                    channel.takeFirstSendOrPeekClosed()?.also { send ->
-                        if (send.tryResumeSend()) {
-                            cont.resume(updateStateWithSend(send))
-                            return@sc
-                        }
-                    }
-                }
-            }
-        }
-
-        private fun updateStateWithSend(send: Send<E>): Boolean {
-            if (send is Closed<*>) {
-                state = IS_CLOSED
-                return false
-            } else {
-                state = IS_HAS_VALUE
-                value = send.element
-                return true
-            }
-        }
-
-        suspend override fun next(): E {
-            when (state) {
-                IS_HAS_VALUE -> {
-                    val value = this.value as E
-                    this.state = IS_UNKNOWN
-                    this.value = null
-                    return value
-                }
-                IS_CLOSED -> throw ClosedReceiveChannelException()
-            }
-            // rare case when hasNext was not invoked yet -- just delegate to receive (leave state as IS_UNKNOWN)
-            return channel.receive()
-        }
-    }
-
-    private abstract class Send<out E> : LockFreeLinkedListNode() {
-        abstract val element: E
-        abstract val elementOrNull: E?
-        abstract fun tryResumeSend(): Boolean
-        abstract fun resumeWithElement(cont: CancellableContinuation<E>)
-        abstract fun resumeWithElementOrNull(cont: CancellableContinuation<E?>)
-    }
-
-    private class SendElement<out E>(
-            val cont: CancellableContinuation<Unit>,
-            override val element: E
-    ) : Send<E>() {
-        override val elementOrNull: E? get() = element
-        override fun tryResumeSend(): Boolean = cont.tryResume(Unit)
-        override fun resumeWithElement(cont: CancellableContinuation<E>) = cont.resume(element)
-        override fun resumeWithElementOrNull(cont: CancellableContinuation<E?>) = cont.resume(element)
-    }
-
-    private interface ReceiveOrClosed<in E> {
-        fun tryResumeReceiveFromSend(value: E): Boolean
-        fun tryResumeReceiveFromSend(value: E, cont: CancellableContinuation<Unit>): Boolean
-    }
-
-    private class Closed<E> : Send<E>(), ReceiveOrClosed<E> {
-        override val element: E get() = throw ClosedReceiveChannelException()
-        override val elementOrNull: E? get() = null
-        override fun tryResumeSend(): Boolean = true
-        override fun resumeWithElement(cont: CancellableContinuation<E>) = cont.resumeWithException(ClosedReceiveChannelException())
-        override fun resumeWithElementOrNull(cont: CancellableContinuation<E?>) = cont.resume(null)
-        override fun tryResumeReceiveFromSend(value: E): Boolean = throw ClosedSendChannelException()
-        override fun tryResumeReceiveFromSend(value: E, cont: CancellableContinuation<Unit>): Boolean {
-            cont.resumeWithException(ClosedSendChannelException())
-            return true
-        }
-    }
-
-    private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
-        override fun tryResumeReceiveFromSend(value: E, cont: CancellableContinuation<Unit>): Boolean {
-            if (tryResumeReceiveFromSend(value)) {
-                cont.resume(Unit)
-                return true
-            }
-            return false
-        }
-        abstract fun resumeReceiveClosed()
-    }
-
-    private class ReceiveNonNull<in E>(val cont: CancellableContinuation<E>) : Receive<E>() {
-        override fun tryResumeReceiveFromSend(value: E): Boolean = cont.tryResume(value)
-        override fun resumeReceiveClosed() = cont.resumeWithException(ClosedReceiveChannelException())
-    }
-
-    private class ReceiveOrNull<in E>(val cont: CancellableContinuation<E?>) : Receive<E>() {
-        override fun tryResumeReceiveFromSend(value: E): Boolean = cont.tryResume(value)
-        override fun resumeReceiveClosed() = cont.resume(null)
-    }
-
-    private class ReceiveHasNext<E>(
-            val iterator: Iterator<E>,
-            val cont: CancellableContinuation<Boolean>
-    ) : Receive<E>(), (Any?) -> Unit {
-        override fun tryResumeReceiveFromSend(value: E): Boolean {
-            iterator.value = value // tentative value (may fail to resume with it)
-            return cont.tryResume(true, this)
-        }
-
-        override fun resumeReceiveClosed() {
-            cont.tryResume(false, this)
-        }
-
-        // callback for tryResume onSuccess
-        override fun invoke(hasNext: Any?) {
-            hasNext as Boolean // try assertion -- that is the value we pass
-            iterator.state = if (hasNext) IS_HAS_VALUE else IS_CLOSED
-            if (!hasNext) iterator.value = null // cleanup tentative value
         }
     }
 }
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt
new file mode 100644
index 0000000..2f837e5
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt
@@ -0,0 +1,133 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import org.junit.Test
+import kotlin.test.assertEquals
+import kotlin.test.assertFalse
+import kotlin.test.assertTrue
+
+class ArrayChannelTest : TestBase() {
+    @Test
+    fun testSimple() = runBlocking {
+        val q = ArrayChannel<Int>(1)
+        check(q.isEmpty && !q.isFull)
+        expect(1)
+        val sender = launch(context) {
+            expect(4)
+            q.send(1) // success -- buffered
+            check(!q.isEmpty && q.isFull)
+            expect(5)
+            q.send(2) // suspends (buffer full)
+            expect(9)
+        }
+        expect(2)
+        val receiver = launch(context) {
+            expect(6)
+            check(q.receive() == 1) // does not suspend -- took from buffer
+            check(!q.isEmpty && q.isFull) // waiting sender's element moved to buffer
+            expect(7)
+            check(q.receive() == 2) // does not suspend (takes from sender)
+            expect(8)
+        }
+        expect(3)
+        sender.join()
+        receiver.join()
+        check(q.isEmpty && !q.isFull)
+        finish(10)
+    }
+
+    @Test
+    fun testStress() = runBlocking {
+        val n = 100_000
+        val q = ArrayChannel<Int>(1)
+        val sender = launch(context) {
+            for (i in 1..n) q.send(i)
+            expect(2)
+        }
+        val receiver = launch(context) {
+            for (i in 1..n) check(q.receive() == i)
+            expect(3)
+        }
+        expect(1)
+        sender.join()
+        receiver.join()
+        finish(4)
+    }
+
+    @Test
+    fun testClosedBufferedReceiveOrNull() = runBlocking {
+        val q = ArrayChannel<Int>(1)
+        check(q.isEmpty && !q.isFull && !q.isClosedForSend && !q.isClosedForReceive)
+        expect(1)
+        launch(context) {
+            expect(5)
+            check(!q.isEmpty && !q.isFull && q.isClosedForSend && !q.isClosedForReceive)
+            assertEquals(42, q.receiveOrNull())
+            expect(6)
+            check(!q.isEmpty && !q.isFull && q.isClosedForSend && q.isClosedForReceive)
+            assertEquals(null, q.receiveOrNull())
+            expect(7)
+        }
+        expect(2)
+        q.send(42) // buffers
+        expect(3)
+        q.close() // goes on
+        expect(4)
+        check(!q.isEmpty && !q.isFull && q.isClosedForSend && !q.isClosedForReceive)
+        yield()
+        check(!q.isEmpty && !q.isFull && q.isClosedForSend && q.isClosedForReceive)
+        finish(8)
+    }
+
+    @Test
+    fun testClosedExceptions() = runBlocking {
+        val q = ArrayChannel<Int>(1)
+        expect(1)
+        launch(context) {
+            expect(4)
+            try { q.receive() }
+            catch (e: ClosedReceiveChannelException) {
+                expect(5)
+            }
+        }
+        expect(2)
+        q.close()
+        expect(3)
+        yield()
+        expect(6)
+        try { q.send(42) }
+        catch (e: ClosedSendChannelException) {
+            finish(7)
+        }
+    }
+
+    @Test
+    fun testOfferAndPool() = runBlocking {
+        val q = ArrayChannel<Int>(1)
+        assertTrue(q.offer(1))
+        expect(1)
+        launch(context) {
+            expect(3)
+            assertEquals(1, q.poll())
+            expect(4)
+            assertEquals(null, q.poll())
+            expect(5)
+            assertEquals(2, q.receive()) // suspends
+            expect(9)
+            assertEquals(3, q.poll())
+            expect(10)
+            assertEquals(null, q.poll())
+            expect(11)
+        }
+        expect(2)
+        yield()
+        expect(6)
+        assertTrue(q.offer(2))
+        expect(7)
+        assertTrue(q.offer(3))
+        expect(8)
+        assertFalse(q.offer(4))
+        yield()
+        finish(12)
+    }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelAtomicityStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt
similarity index 86%
rename from kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelAtomicityStressTest.kt
rename to kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt
index d444a5d..a395ce4 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelAtomicityStressTest.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt
@@ -2,13 +2,22 @@
 
 import kotlinx.coroutines.experimental.*
 import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
 import java.util.*
 import kotlin.test.assertEquals
 
-class RendezvousChannelAtomicityStressTest {
+@RunWith(Parameterized::class)
+class ChannelAtomicCancelStressTest(val kind: TestChannelKind) {
+    companion object {
+        @Parameterized.Parameters(name = "{0}")
+        @JvmStatic
+        fun params(): Collection<Array<Any>> = TestChannelKind.values().map { arrayOf<Any>(it) }
+    }
+
     val TEST_DURATION = 3000L
 
-    val channel = RendezvousChannel<Int>()
+    val channel = kind.create()
     val senderDone = RendezvousChannel<Boolean>()
     val receiverDone = RendezvousChannel<Boolean>()
 
@@ -25,7 +34,7 @@
     lateinit var receiver: Job
 
     @Test
-    fun testStress() = runBlocking {
+    fun testAtomicCancelStress() = runBlocking {
         val deadline = System.currentTimeMillis() + TEST_DURATION
         launchSender()
         launchReceiver()
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt
new file mode 100644
index 0000000..32a33ca
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt
@@ -0,0 +1,106 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicInteger
+import kotlin.test.assertEquals
+import kotlin.test.assertTrue
+
+
+@RunWith(Parameterized::class)
+class ChannelSendReceiveStressTest(
+    val kind: TestChannelKind,
+    val nSenders: Int,
+    val nReceivers: Int
+) {
+    companion object {
+        @Parameterized.Parameters(name = "{0}, nSenders={1}, nReceivers={2}")
+        @JvmStatic
+        fun params(): Collection<Array<Any>> =
+                listOf(1, 2, 10).flatMap { nSenders ->
+                    listOf(1, 6).flatMap { nReceivers ->
+                        TestChannelKind.values().map { arrayOf<Any>(it, nSenders, nReceivers) }
+                    }
+                }
+    }
+
+    val nEvents = 1_000_000
+
+    val channel = kind.create()
+    val sendersCompleted = AtomicInteger()
+    val receiversCompleted = AtomicInteger()
+    val dupes = AtomicInteger()
+    val received = ConcurrentHashMap<Int,Int>()
+    val receivedBy = IntArray(nReceivers)
+
+    @Test
+    fun testSendReceiveStress() = runBlocking {
+        val receivers = List(nReceivers) { receiverIndex ->
+            // different event receivers use different code
+            launch(CommonPool + CoroutineName("receiver$receiverIndex")) {
+                when (receiverIndex % 3) {
+                    0 -> doReceive(receiverIndex)
+                    1 -> doReceiveOrNull(receiverIndex)
+                    2 -> doIterator(receiverIndex)
+                }
+                receiversCompleted.incrementAndGet()
+            }
+        }
+        val senders = List(nSenders) { senderIndex ->
+            launch(CommonPool + CoroutineName("sender$senderIndex")) {
+                for (i in senderIndex until nEvents step nSenders)
+                    channel.send(i)
+                sendersCompleted.incrementAndGet()
+            }
+        }
+        senders.forEach { it.join() }
+        channel.close()
+        receivers.forEach { it.join() }
+        println("Tested $kind with nSenders=$nSenders, nReceivers=$nReceivers")
+        println("Completed successfully ${sendersCompleted.get()} sender coroutines")
+        println("Completed successfully ${receiversCompleted.get()} receiver coroutines")
+        println("              Received ${received.size} events")
+        println("        Received dupes ${dupes.get()}")
+        repeat(nReceivers) { receiveIndex ->
+            println("        Received by #$receiveIndex ${receivedBy[receiveIndex]}")
+        }
+        assertEquals(nSenders, sendersCompleted.get())
+        assertEquals(nReceivers, receiversCompleted.get())
+        assertEquals(0, dupes.get())
+        assertEquals(nEvents, received.size)
+        repeat(nReceivers) { receiveIndex ->
+            assertTrue(receivedBy[receiveIndex] > 0, "Each receiver should have received something")
+        }
+    }
+
+    private fun doReceived(receiverIndex: Int, event: Int) {
+        if (received.put(event, event) != null) {
+            println("Duplicate event $event")
+            dupes.incrementAndGet()
+        }
+        receivedBy[receiverIndex]++
+    }
+
+    private suspend fun doReceive(receiverIndex: Int) {
+        while (true) {
+            try { doReceived(receiverIndex, channel.receive()) }
+            catch (ex: ClosedReceiveChannelException) { break }
+        }
+
+    }
+
+    private suspend fun doReceiveOrNull(receiverIndex: Int) {
+        while (true) {
+            doReceived(receiverIndex, channel.receiveOrNull() ?: break)
+        }
+    }
+
+    private suspend fun doIterator(receiverIndex: Int) {
+        for (event in channel) {
+            doReceived(receiverIndex, event)
+        }
+    }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelMultipleReceiversStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelMultipleReceiversStressTest.kt
deleted file mode 100644
index a8269c3..0000000
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelMultipleReceiversStressTest.kt
+++ /dev/null
@@ -1,84 +0,0 @@
-package kotlinx.coroutines.experimental.channels
-
-import kotlinx.coroutines.experimental.CommonPool
-import kotlinx.coroutines.experimental.join
-import kotlinx.coroutines.experimental.launch
-import kotlinx.coroutines.experimental.runBlocking
-import org.junit.Test
-import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.atomic.AtomicInteger
-import kotlin.test.assertEquals
-import kotlin.test.assertTrue
-
-class RendezvousChannelMultipleReceiversStressTest {
-    val nEvents = 1_000_000
-    val nReceivers = 6
-
-    val channel = RendezvousChannel<Int>()
-    val completedSuccessfully = AtomicInteger()
-    val dupes = AtomicInteger()
-    val received = ConcurrentHashMap<Int,Int>()
-    val receivedBy = IntArray(nReceivers)
-
-    @Test
-    fun testStress() = runBlocking {
-        val receivers = List(nReceivers) { receiverIndex ->
-            // different event receivers use different code
-            launch(CommonPool) {
-                when (receiverIndex % 3) {
-                    0 -> doReceive(receiverIndex)
-                    1 -> doReceiveOrNull(receiverIndex)
-                    2 -> doIterator(receiverIndex)
-                }
-                completedSuccessfully.incrementAndGet()
-            }
-        }
-        repeat(nEvents) {
-            channel.send(it)
-        }
-        channel.close()
-        receivers.forEach { it.join() }
-        println("Completed successfully ${completedSuccessfully.get()} coroutines")
-        println("              Received ${received.size} events")
-        println("        Received dupes ${dupes.get()}")
-        repeat(nReceivers) { receiveIndex ->
-            println("        Received by #$receiveIndex ${receivedBy[receiveIndex]}")
-        }
-        assertEquals(nReceivers, completedSuccessfully.get())
-        assertEquals(0, dupes.get())
-        assertEquals(nEvents, received.size)
-        repeat(nReceivers) { receiveIndex ->
-            assertTrue(receivedBy[receiveIndex] > nEvents / nReceivers / 2, "Should be balanced")
-        }
-    }
-
-    private fun doReceived(event: Int) {
-        if (received.put(event, event) != null) {
-            println("Duplicate event $event")
-            dupes.incrementAndGet()
-        }
-    }
-
-    private suspend fun doReceive(receiverIndex: Int) {
-        while (true) {
-            try { doReceived(channel.receive()) }
-            catch (ex: ClosedReceiveChannelException) { break }
-            receivedBy[receiverIndex]++
-        }
-
-    }
-
-    private suspend fun doReceiveOrNull(receiverIndex: Int) {
-        while (true) {
-            doReceived(channel.receiveOrNull() ?: break)
-            receivedBy[receiverIndex]++
-        }
-    }
-
-    private suspend fun doIterator(receiverIndex: Int) {
-        for (event in channel) {
-            doReceived(event)
-            receivedBy[receiverIndex]++
-        }
-    }
-}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
index 669607e..741f5a9 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
@@ -103,14 +103,14 @@
         expect(1)
         launch(context) {
             expect(3)
-            assertEquals(null, q.pool())
+            assertEquals(null, q.poll())
             expect(4)
             assertEquals(2, q.receive())
             expect(7)
-            assertEquals(null, q.pool())
+            assertEquals(null, q.poll())
             yield()
             expect(9)
-            assertEquals(3, q.pool())
+            assertEquals(3, q.poll())
             expect(10)
         }
         expect(2)
@@ -123,4 +123,110 @@
         q.send(3)
         finish(11)
     }
+
+    @Test
+    fun testIteratorClosed() = runBlocking {
+        val q = RendezvousChannel<Int>()
+        expect(1)
+        launch(context) {
+            expect(3)
+            q.close()
+            expect(4)
+        }
+        expect(2)
+        for (x in q) {
+            expectUnreached()
+        }
+        finish(5)
+    }
+
+    @Test
+    fun testIteratorOne() = runBlocking {
+        val q = RendezvousChannel<Int>()
+        expect(1)
+        launch(context) {
+            expect(3)
+            q.send(1)
+            expect(4)
+            q.close()
+            expect(5)
+        }
+        expect(2)
+        for (x in q) {
+            expect(6)
+            assertEquals(1, x)
+        }
+        finish(7)
+    }
+
+    @Test
+    fun testIteratorOneWithYield() = runBlocking {
+        val q = RendezvousChannel<Int>()
+        expect(1)
+        launch(context) {
+            expect(3)
+            q.send(1) // will suspend
+            expect(6)
+            q.close()
+            expect(7)
+        }
+        expect(2)
+        yield() // yield to sender coroutine right before starting for loop
+        expect(4)
+        for (x in q) {
+            expect(5)
+            assertEquals(1, x)
+        }
+        finish(8)
+    }
+
+    @Test
+    fun testIteratorTwo() = runBlocking {
+        val q = RendezvousChannel<Int>()
+        expect(1)
+        launch(context) {
+            expect(3)
+            q.send(1)
+            expect(4)
+            q.send(2)
+            expect(7)
+            q.close()
+            expect(8)
+        }
+        expect(2)
+        for (x in q) {
+            when (x) {
+                1 -> expect(5)
+                2 -> expect(6)
+                else -> expectUnreached()
+            }
+        }
+        finish(9)
+    }
+
+    @Test
+    fun testIteratorTwoWithYield() = runBlocking {
+        val q = RendezvousChannel<Int>()
+        expect(1)
+        launch(context) {
+            expect(3)
+            q.send(1) // will suspend
+            expect(6)
+            q.send(2)
+            expect(7)
+            q.close()
+            expect(8)
+        }
+        expect(2)
+        yield() // yield to sender coroutine right before starting for loop
+        expect(4)
+        for (x in q) {
+            when (x) {
+                1 -> expect(5)
+                2 -> expect(9)
+                else -> expectUnreached()
+            }
+        }
+        finish(10)
+    }
 }
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt
new file mode 100644
index 0000000..1464151
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt
@@ -0,0 +1,40 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.CommonPool
+import kotlinx.coroutines.experimental.launch
+import kotlinx.coroutines.experimental.runBlocking
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import kotlin.test.assertEquals
+
+@RunWith(Parameterized::class)
+class SimpleSendReceiveTest(
+    val kind: TestChannelKind,
+    val n: Int
+) {
+    companion object {
+        @Parameterized.Parameters(name = "{0}, n={1}")
+        @JvmStatic
+        fun params(): Collection<Array<Any>> = TestChannelKind.values().flatMap { kind ->
+            listOf(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 100, 1000).map { n ->
+                arrayOf<Any>(kind, n)
+            }
+        }
+    }
+
+    val channel = kind.create()
+
+    @Test
+    fun testSimpleSendReceive() = runBlocking {
+        launch(CommonPool) {
+            repeat(n) { channel.send(it) }
+            channel.close()
+        }
+        var received = 0
+        for (x in channel) {
+            assertEquals(received++, x)
+        }
+        assertEquals(n, received)
+    }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
new file mode 100644
index 0000000..e5533f4
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
@@ -0,0 +1,19 @@
+package kotlinx.coroutines.experimental.channels
+
+enum class TestChannelKind {
+    RENDEZVOUS {
+        override fun create(): Channel<Int> = RendezvousChannel<Int>()
+        override fun toString(): String = "RendezvousChannel"
+    },
+    ARRAY_1 {
+        override fun create(): Channel<Int> = ArrayChannel<Int>(1)
+        override fun toString(): String = "ArrayChannel(1)"
+    },
+    ARRAY_10 {
+        override fun create(): Channel<Int> = ArrayChannel<Int>(8)
+        override fun toString(): String = "ArrayChannel(8)"
+    }
+    ;
+
+    abstract fun create(): Channel<Int>
+}