Migrate channels and related operators to common, so channels can be used from JS

Fixes #201
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.common.kt
index e2e67d3..6759e8f 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.common.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.common.kt
@@ -18,7 +18,7 @@
 
 public expect class CompletionHandlerException(message: String, cause: Throwable) : RuntimeException
 
-public expect open class CancellationException(message: String) : IllegalStateException
+public expect open class CancellationException(message: String?) : IllegalStateException
 
 public expect class JobCancellationException(
     message: String,
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
new file mode 100644
index 0000000..5fcfc87
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
@@ -0,0 +1,1021 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.internal.*
+import kotlinx.coroutines.experimental.internalAnnotations.*
+import kotlinx.coroutines.experimental.intrinsics.*
+import kotlinx.coroutines.experimental.selects.*
+import kotlin.coroutines.experimental.*
+
+/**
+ * Abstract send channel. It is a base class for all send channel implementations.
+ */
+public abstract class AbstractSendChannel<E> : SendChannel<E> {
+    /** @suppress **This is unstable API and it is subject to change.** */
+    protected val queue = LockFreeLinkedListHead()
+
+    // ------ extension points for buffered channels ------
+
+    /**
+     * Returns `true` if [isBufferFull] is always `true`.
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected abstract val isBufferAlwaysFull: Boolean
+
+    /**
+     * Returns `true` if this channel's buffer is full.
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected abstract val isBufferFull: Boolean
+
+    // ------ internal functions for override by buffered channels ------
+
+    /**
+     * Tries to add element to buffer or to queued receiver.
+     * Return type is `OFFER_SUCCESS | OFFER_FAILED | Closed`.
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected open fun offerInternal(element: E): Any {
+        while (true) {
+            val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED
+            val token = receive.tryResumeReceive(element, idempotent = null)
+            if (token != null) {
+                receive.completeResumeReceive(token)
+                return receive.offerResult
+            }
+        }
+    }
+
+    /**
+     * Tries to add element to buffer or to queued receiver if select statement clause was not selected yet.
+     * Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`.
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected open fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
+        // offer atomically with select
+        val offerOp = describeTryOffer(element)
+        val failure = select.performAtomicTrySelect(offerOp)
+        if (failure != null) return failure
+        val receive = offerOp.result
+        receive.completeResumeReceive(offerOp.resumeToken!!)
+        return receive.offerResult
+    }
+
+    // ------ state functions & helpers for concrete implementations ------
+
+    /**
+     * Returns non-null closed token if it is last in the queue.
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected val closedForSend: Closed<*>? get() = queue.prev as? Closed<*>
+
+    /**
+     * Returns non-null closed token if it is first in the queue.
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected val closedForReceive: Closed<*>? get() = queue.next as? Closed<*>
+
+    /**
+     * Retrieves first sending waiter from the queue or returns closed token.
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected fun takeFirstSendOrPeekClosed(): Send? =
+        queue.removeFirstIfIsInstanceOfOrPeekIf<Send> { it is Closed<*> }
+
+    /**
+     * Queues buffered element, returns null on success or
+     * returns node reference if it was already closed or is waiting for receive.
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected fun sendBuffered(element: E): ReceiveOrClosed<*>? {
+        queue.addLastIfPrev(SendBuffered(element), { prev ->
+            if (prev is ReceiveOrClosed<*>) return@sendBuffered prev
+            true
+        })
+        return null
+    }
+
+    /**
+     * Queues conflated element, returns null on success or
+     * returns node reference if it was already closed or is waiting for receive.
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected fun sendConflated(element: E): ReceiveOrClosed<*>? {
+        val node = SendBuffered(element)
+        queue.addLastIfPrev(node, { prev ->
+            if (prev is ReceiveOrClosed<*>) return@sendConflated prev
+            true
+        })
+        conflatePreviousSendBuffered(node)
+        return null
+    }
+
+    /**
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected fun conflatePreviousSendBuffered(node: LockFreeLinkedListNode) {
+        val prev = node.prev
+        (prev as? SendBuffered<*>)?.remove()
+    }
+
+    /**
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected fun describeSendBuffered(element: E): AddLastDesc<*> = SendBufferedDesc(queue, element)
+
+    private open class SendBufferedDesc<E>(
+        queue: LockFreeLinkedListHead,
+        element: E
+    ) : AddLastDesc<SendBuffered<E>>(queue, SendBuffered(element)) {
+        override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
+            if (affected is ReceiveOrClosed<*>) return OFFER_FAILED
+            return null
+        }
+    }
+
+    /**
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected fun describeSendConflated(element: E): AddLastDesc<*> = SendConflatedDesc(queue, element)
+
+    private class SendConflatedDesc<E>(
+        queue: LockFreeLinkedListHead,
+        element: E
+    ) : SendBufferedDesc<E>(queue, element) {
+        override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
+            super.finishOnSuccess(affected, next)
+            // remove previous SendBuffered
+            (affected as? SendBuffered<*>)?.remove()
+        }
+    }
+
+    // ------ SendChannel ------
+
+    public final override val isClosedForSend: Boolean get() = closedForSend != null
+    public final override val isFull: Boolean get() = queue.next !is ReceiveOrClosed<*> && isBufferFull
+
+    public final override suspend fun send(element: E) {
+        // fast path -- try offer non-blocking
+        if (offer(element)) return
+        // slow-path does suspend
+        return sendSuspend(element)
+    }
+
+    public final override fun offer(element: E): Boolean {
+        val result = offerInternal(element)
+        return when {
+            result === OFFER_SUCCESS -> true
+            result === OFFER_FAILED -> false
+            result is Closed<*> -> throw result.sendException
+            else -> error("offerInternal returned $result")
+        }
+    }
+
+    private suspend fun sendSuspend(element: E): Unit = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
+        val send = SendElement(element, cont)
+        loop@ while (true) {
+            val enqueueResult = enqueueSend(send)
+            when (enqueueResult) {
+                null -> { // enqueued successfully
+                    cont.initCancellability() // make it properly cancellable
+                    cont.removeOnCancel(send)
+                    return@sc
+                }
+                is Closed<*> -> {
+                    cont.resumeWithException(enqueueResult.sendException)
+                    return@sc
+                }
+            }
+            // hm... receiver is waiting or buffer is not full. try to offer
+            val offerResult = offerInternal(element)
+            when {
+                offerResult === OFFER_SUCCESS -> {
+                    cont.resume(Unit)
+                    return@sc
+                }
+                offerResult === OFFER_FAILED -> continue@loop
+                offerResult is Closed<*> -> {
+                    cont.resumeWithException(offerResult.sendException)
+                    return@sc
+                }
+                else -> error("offerInternal returned $offerResult")
+            }
+        }
+    }
+
+    /**
+     * Result is:
+     * * null -- successfully enqueued
+     * * ENQUEUE_FAILED -- buffer is not full (should not enqueue)
+     * * ReceiveOrClosed<*> -- receiver is waiting or it is closed (should not enqueue)
+     */
+    private fun enqueueSend(send: SendElement): Any? {
+        if (isBufferAlwaysFull) {
+            queue.addLastIfPrev(send, { prev ->
+                if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
+                true
+            })
+        } else {
+            if (!queue.addLastIfPrevAndIf(send, { prev ->
+                if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
+                true
+            }, { isBufferFull }))
+                return ENQUEUE_FAILED
+        }
+        return null
+    }
+
+    public override fun close(cause: Throwable?): Boolean {
+        val closed = Closed<E>(cause)
+        while (true) {
+            val receive = takeFirstReceiveOrPeekClosed()
+            if (receive == null) {
+                // queue empty or has only senders -- try add last "Closed" item to the queue
+                if (queue.addLastIfPrev(closed, { prev ->
+                    if (prev is Closed<*>) return false // already closed
+                    prev !is ReceiveOrClosed<*> // only add close if no waiting receive
+                })) {
+                    onClosed(closed)
+                    afterClose(cause)
+                    return true
+                }
+                continue // retry on failure
+            }
+            if (receive is Closed<*>) return false // already marked as closed -- nothing to do
+            receive as Receive<E> // type assertion
+            receive.resumeReceiveClosed(closed)
+        }
+    }
+
+    /**
+     * Invoked when [Closed] element was just added.
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected open fun onClosed(closed: Closed<E>) {}
+
+    /**
+     * Invoked after successful [close].
+     */
+    protected open fun afterClose(cause: Throwable?) {}
+
+    /**
+     * Retrieves first receiving waiter from the queue or returns closed token.
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected open fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
+        queue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed<E>>({ it is Closed<*> })
+
+    // ------ registerSelectSend ------
+
+    /**
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected fun describeTryOffer(element: E): TryOfferDesc<E> = TryOfferDesc(element, queue)
+
+    /**
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected class TryOfferDesc<E>(
+        @JvmField val element: E,
+        queue: LockFreeLinkedListHead
+    ) : RemoveFirstDesc<ReceiveOrClosed<E>>(queue) {
+        @JvmField var resumeToken: Any? = null
+
+        override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
+            if (affected !is ReceiveOrClosed<*>) return OFFER_FAILED
+            if (affected is Closed<*>) return affected
+            return null
+        }
+
+        override fun validatePrepared(node: ReceiveOrClosed<E>): Boolean {
+            val token = node.tryResumeReceive(element, idempotent = this) ?: return false
+            resumeToken = token
+            return true
+        }
+    }
+
+    private inner class TryEnqueueSendDesc<R>(
+        element: E,
+        select: SelectInstance<R>,
+        block: suspend (SendChannel<E>) -> R
+    ) : AddLastDesc<SendSelect<E, R>>(queue, SendSelect(element, this@AbstractSendChannel, select, block)) {
+        override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
+            if (affected is ReceiveOrClosed<*>) {
+                return affected as? Closed<*> ?: ENQUEUE_FAILED
+            }
+            return null
+        }
+
+        override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
+            if (!isBufferFull) return ENQUEUE_FAILED
+            return super.onPrepare(affected, next)
+        }
+
+        override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
+            super.finishOnSuccess(affected, next)
+            // we can actually remove on select start, but this is also Ok (it'll get removed if discovered there)
+            node.disposeOnSelect()
+        }
+    }
+
+    final override val onSend: SelectClause2<E, SendChannel<E>>
+        get() = object : SelectClause2<E, SendChannel<E>> {
+            override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
+                registerSelectSend(select, param, block)
+            }
+        }
+
+    private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
+        while (true) {
+            if (select.isSelected) return
+            if (isFull) {
+                val enqueueOp = TryEnqueueSendDesc(element, select, block)
+                val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
+                when {
+                    enqueueResult === ALREADY_SELECTED -> return
+                    enqueueResult === ENQUEUE_FAILED -> {} // retry
+                    enqueueResult is Closed<*> -> throw enqueueResult.sendException
+                    else -> error("performAtomicIfNotSelected(TryEnqueueSendDesc) returned $enqueueResult")
+                }
+            } else {
+                val offerResult = offerSelectInternal(element, select)
+                when {
+                    offerResult === ALREADY_SELECTED -> return
+                    offerResult === OFFER_FAILED -> {} // retry
+                    offerResult === OFFER_SUCCESS -> {
+                        block.startCoroutineUndispatched(receiver = this, completion = select.completion)
+                        return
+                    }
+                    offerResult is Closed<*> -> throw offerResult.sendException
+                    else -> error("offerSelectInternal returned $offerResult")
+                }
+            }
+        }
+    }
+
+    // ------ debug ------
+
+    public override fun toString() =
+        "$classSimpleName@$hexAddress{$queueDebugStateString}$bufferDebugString"
+
+    private val queueDebugStateString: String
+        get() {
+            val head = queue.next
+            if (head === queue) return "EmptyQueue"
+            var result = when (head) {
+                is Closed<*> -> head.toString()
+                is Receive<*> -> "ReceiveQueued"
+                is Send -> "SendQueued"
+                else -> "UNEXPECTED:$head" // should not happen
+            }
+            val tail = queue.prev
+            if (tail !== head) {
+                result += ",queueSize=${countQueueSize()}"
+                if (tail is Closed<*>) result += ",closedForSend=$tail"
+            }
+            return result
+        }
+
+    private fun countQueueSize(): Int {
+        var size = 0
+        queue.forEach<LockFreeLinkedListNode> { size++ }
+        return size
+    }
+
+    protected open val bufferDebugString: String get() = ""
+
+    // ------ private ------
+
+    private class SendSelect<E, R>(
+        override val pollResult: Any?,
+        @JvmField val channel: SendChannel<E>,
+        @JvmField val select: SelectInstance<R>,
+        @JvmField val block: suspend (SendChannel<E>) -> R
+    ) : LockFreeLinkedListNode(), Send, DisposableHandle {
+        override fun tryResumeSend(idempotent: Any?): Any? =
+            if (select.trySelect(idempotent)) SELECT_STARTED else null
+
+        override fun completeResumeSend(token: Any) {
+            check(token === SELECT_STARTED)
+            block.startCoroutine(receiver = channel, completion = select.completion)
+        }
+
+        fun disposeOnSelect() {
+            select.disposeOnSelect(this)
+        }
+
+        override fun dispose() {
+            remove()
+        }
+
+        override fun resumeSendClosed(closed: Closed<*>) {
+            if (select.trySelect(null))
+                select.resumeSelectCancellableWithException(closed.sendException)
+        }
+
+        override fun toString(): String = "SendSelect($pollResult)[$channel, $select]"
+    }
+
+    private class SendBuffered<out E>(
+        @JvmField val element: E
+    ) : LockFreeLinkedListNode(), Send {
+        override val pollResult: Any? get() = element
+        override fun tryResumeSend(idempotent: Any?): Any? = SEND_RESUMED
+        override fun completeResumeSend(token: Any) { check(token === SEND_RESUMED) }
+        override fun resumeSendClosed(closed: Closed<*>) {}
+    }
+}
+
+/**
+ * Abstract send/receive channel. It is a base class for all channel implementations.
+ */
+public abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E> {
+    // ------ extension points for buffered channels ------
+
+    /**
+     * Returns `true` if [isBufferEmpty] is always `true`.
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected abstract val isBufferAlwaysEmpty: Boolean
+
+    /**
+     * Returns `true` if this channel's buffer is empty.
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected abstract val isBufferEmpty: Boolean
+
+    // ------ internal functions for override by buffered channels ------
+
+    /**
+     * Tries to remove element from buffer or from queued sender.
+     * Return type is `E | POLL_FAILED | Closed`
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected open fun pollInternal(): Any? {
+        while (true) {
+            val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED
+            val token = send.tryResumeSend(idempotent = null)
+            if (token != null) {
+                send.completeResumeSend(token)
+                return send.pollResult
+            }
+        }
+    }
+
+    /**
+     * Tries to remove element from buffer or from queued sender if select statement clause was not selected yet.
+     * Return type is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected open fun pollSelectInternal(select: SelectInstance<*>): Any? {
+        // poll atomically with select
+        val pollOp = describeTryPoll()
+        val failure = select.performAtomicTrySelect(pollOp)
+        if (failure != null) return failure
+        val send = pollOp.result
+        send.completeResumeSend(pollOp.resumeToken!!)
+        return pollOp.pollResult
+    }
+
+    // ------ state functions & helpers for concrete implementations ------
+
+    /**
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected val hasReceiveOrClosed: Boolean get() = queue.next is ReceiveOrClosed<*>
+
+    // ------ ReceiveChannel ------
+
+    public final override val isClosedForReceive: Boolean get() = closedForReceive != null && isBufferEmpty
+    public final override val isEmpty: Boolean get() = queue.next !is Send && isBufferEmpty
+
+    @Suppress("UNCHECKED_CAST")
+    public final override suspend fun receive(): E {
+        // fast path -- try poll non-blocking
+        val result = pollInternal()
+        if (result !== POLL_FAILED) return receiveResult(result)
+        // slow-path does suspend
+        return receiveSuspend()
+    }
+
+    @Suppress("UNCHECKED_CAST")
+    private fun receiveResult(result: Any?): E {
+        if (result is Closed<*>) throw result.receiveException
+        return result as E
+    }
+
+    @Suppress("UNCHECKED_CAST")
+    private suspend fun receiveSuspend(): E = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
+        val receive = ReceiveElement(cont as CancellableContinuation<E?>, nullOnClose = false)
+        while (true) {
+            if (enqueueReceive(receive)) {
+                cont.initCancellability() // make it properly cancellable
+                removeReceiveOnCancel(cont, receive)
+                return@sc
+            }
+            // hm... something is not right. try to poll
+            val result = pollInternal()
+            if (result is Closed<*>) {
+                cont.resumeWithException(result.receiveException)
+                return@sc
+            }
+            if (result !== POLL_FAILED) {
+                cont.resume(result as E)
+                return@sc
+            }
+        }
+    }
+
+    private fun enqueueReceive(receive: Receive<E>): Boolean {
+        val result = if (isBufferAlwaysEmpty)
+            queue.addLastIfPrev(receive, { it !is Send }) else
+            queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty })
+        if (result) onReceiveEnqueued()
+        return result
+    }
+
+    @Suppress("UNCHECKED_CAST")
+    public final override suspend fun receiveOrNull(): E? {
+        // fast path -- try poll non-blocking
+        val result = pollInternal()
+        if (result !== POLL_FAILED) return receiveOrNullResult(result)
+        // slow-path does suspend
+        return receiveOrNullSuspend()
+    }
+
+    @Suppress("UNCHECKED_CAST")
+    private fun receiveOrNullResult(result: Any?): E? {
+        if (result is Closed<*>) {
+            if (result.closeCause != null) throw result.closeCause
+            return null
+        }
+        return result as E
+    }
+
+    @Suppress("UNCHECKED_CAST")
+    private suspend fun receiveOrNullSuspend(): E? = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
+        val receive = ReceiveElement(cont, nullOnClose = true)
+        while (true) {
+            if (enqueueReceive(receive)) {
+                cont.initCancellability() // make it properly cancellable
+                removeReceiveOnCancel(cont, receive)
+                return@sc
+            }
+            // hm... something is not right. try to poll
+            val result = pollInternal()
+            if (result is Closed<*>) {
+                if (result.closeCause == null)
+                    cont.resume(null)
+                else
+                    cont.resumeWithException(result.closeCause)
+                return@sc
+            }
+            if (result !== POLL_FAILED) {
+                cont.resume(result as E)
+                return@sc
+            }
+        }
+    }
+
+    @Suppress("UNCHECKED_CAST")
+    public final override fun poll(): E? {
+        val result = pollInternal()
+        return if (result === POLL_FAILED) null else receiveOrNullResult(result)
+    }
+
+    override fun cancel(cause: Throwable?): Boolean =
+        close(cause).also {
+            cleanupSendQueueOnCancel()
+        }
+
+    // Note: this function is invoked when channel is already closed
+    protected open fun cleanupSendQueueOnCancel() {
+        val closed = closedForSend ?: error("Cannot happen")
+        while (true) {
+            val send = takeFirstSendOrPeekClosed() ?: error("Cannot happen")
+            if (send is Closed<*>) {
+                check(send === closed)
+                return // cleaned
+            }
+            send.resumeSendClosed(closed)
+        }
+    }
+
+    public final override fun iterator(): ChannelIterator<E> = Itr(this)
+
+    // ------ registerSelectReceive ------
+
+    /**
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected fun describeTryPoll(): TryPollDesc<E> = TryPollDesc(queue)
+
+    /**
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected class TryPollDesc<E>(queue: LockFreeLinkedListHead) : RemoveFirstDesc<Send>(queue) {
+        @JvmField var resumeToken: Any? = null
+        @JvmField var pollResult: E? = null
+
+        override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
+            if (affected is Closed<*>) return affected
+            if (affected !is Send) return POLL_FAILED
+            return null
+        }
+
+        @Suppress("UNCHECKED_CAST")
+        override fun validatePrepared(node: Send): Boolean {
+            val token = node.tryResumeSend(idempotent = this) ?: return false
+            resumeToken = token
+            pollResult = node.pollResult as E
+            return true
+        }
+    }
+
+    private inner class TryEnqueueReceiveDesc<E, R>(
+        select: SelectInstance<R>,
+        block: suspend (E?) -> R,
+        nullOnClose: Boolean
+    ) : AddLastDesc<ReceiveSelect<R, E>>(queue, ReceiveSelect(select, block, nullOnClose)) {
+        override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
+            if (affected is Send) return ENQUEUE_FAILED
+            return null
+        }
+
+        override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
+            if (!isBufferEmpty) return ENQUEUE_FAILED
+            return super.onPrepare(affected, next)
+        }
+
+        override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
+            super.finishOnSuccess(affected, next)
+            // notify the there is one more receiver
+            onReceiveEnqueued()
+            // we can actually remove on select start, but this is also Ok (it'll get removed if discovered there)
+            node.removeOnSelectCompletion()
+        }
+    }
+
+    final override val onReceive: SelectClause1<E>
+        get() = object : SelectClause1<E> {
+            override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E) -> R) {
+                registerSelectReceive(select, block)
+            }
+        }
+
+    @Suppress("UNCHECKED_CAST")
+    private fun <R> registerSelectReceive(select: SelectInstance<R>, block: suspend (E) -> R) {
+        while (true) {
+            if (select.isSelected) return
+            if (isEmpty) {
+                val enqueueOp = TryEnqueueReceiveDesc(select, block as (suspend (E?) -> R), nullOnClose = false)
+                val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
+                when {
+                    enqueueResult === ALREADY_SELECTED -> return
+                    enqueueResult === ENQUEUE_FAILED -> {} // retry
+                    else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
+                }
+            } else {
+                val pollResult = pollSelectInternal(select)
+                when {
+                    pollResult === ALREADY_SELECTED -> return
+                    pollResult === POLL_FAILED -> {} // retry
+                    pollResult is Closed<*> -> throw pollResult.receiveException
+                    else -> {
+                        block.startCoroutineUndispatched(pollResult as E, select.completion)
+                        return
+                    }
+                }
+            }
+        }
+    }
+
+    final override val onReceiveOrNull: SelectClause1<E?>
+        get() = object : SelectClause1<E?> {
+            override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E?) -> R) {
+                registerSelectReceiveOrNull(select, block)
+            }
+        }
+
+    @Suppress("UNCHECKED_CAST")
+    private fun <R> registerSelectReceiveOrNull(select: SelectInstance<R>, block: suspend (E?) -> R) {
+        while (true) {
+            if (select.isSelected) return
+            if (isEmpty) {
+                val enqueueOp = TryEnqueueReceiveDesc(select, block, nullOnClose = true)
+                val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
+                when {
+                    enqueueResult === ALREADY_SELECTED -> return
+                    enqueueResult === ENQUEUE_FAILED -> {} // retry
+                    else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
+                }
+            } else {
+                val pollResult = pollSelectInternal(select)
+                when {
+                    pollResult === ALREADY_SELECTED -> return
+                    pollResult === POLL_FAILED -> {} // retry
+                    pollResult is Closed<*> -> {
+                        if (pollResult.closeCause == null) {
+                            if (select.trySelect(null))
+                                block.startCoroutineUndispatched(null, select.completion)
+                            return
+                        } else
+                            throw pollResult.closeCause
+                    }
+                    else -> {
+                        // selected successfully
+                        block.startCoroutineUndispatched(pollResult as E, select.completion)
+                        return
+                    }
+                }
+            }
+        }
+    }
+
+    // ------ protected ------
+
+    override fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
+        super.takeFirstReceiveOrPeekClosed().also {
+            if (it != null && it !is Closed<*>) onReceiveDequeued()
+        }
+
+    /**
+     * Invoked when receiver is successfully enqueued to the queue of waiting receivers.
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected open fun onReceiveEnqueued() {}
+
+    /**
+     * Invoked when enqueued receiver was successfully removed from the queue of waiting receivers.
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected open fun onReceiveDequeued() {}
+
+    // ------ private ------
+
+    private fun removeReceiveOnCancel(cont: CancellableContinuation<*>, receive: Receive<*>) {
+        cont.invokeOnCompletion {
+            if (cont.isCancelled && receive.remove())
+                onReceiveDequeued()
+        }
+    }
+
+    private class Itr<E>(val channel: AbstractChannel<E>) : ChannelIterator<E> {
+        var result: Any? = POLL_FAILED // E | POLL_FAILED | Closed
+
+        override suspend fun hasNext(): Boolean {
+            // check for repeated hasNext
+            if (result !== POLL_FAILED) return hasNextResult(result)
+            // fast path -- try poll non-blocking
+            result = channel.pollInternal()
+            if (result !== POLL_FAILED) return hasNextResult(result)
+            // slow-path does suspend
+            return hasNextSuspend()
+        }
+
+        private fun hasNextResult(result: Any?): Boolean {
+            if (result is Closed<*>) {
+                if (result.closeCause != null) throw result.receiveException
+                return false
+            }
+            return true
+        }
+
+        private suspend fun hasNextSuspend(): Boolean = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
+            val receive = ReceiveHasNext(this, cont)
+            while (true) {
+                if (channel.enqueueReceive(receive)) {
+                    cont.initCancellability() // make it properly cancellable
+                    channel.removeReceiveOnCancel(cont, receive)
+                    return@sc
+                }
+                // hm... something is not right. try to poll
+                val result = channel.pollInternal()
+                this.result = result
+                if (result is Closed<*>) {
+                    if (result.closeCause == null)
+                        cont.resume(false)
+                    else
+                        cont.resumeWithException(result.receiveException)
+                    return@sc
+                }
+                if (result !== POLL_FAILED) {
+                    cont.resume(true)
+                    return@sc
+                }
+            }
+        }
+
+        @Suppress("UNCHECKED_CAST")
+        override suspend fun next(): E {
+            val result = this.result
+            if (result is Closed<*>) throw result.receiveException
+            if (result !== POLL_FAILED) {
+                this.result = POLL_FAILED
+                return result as E
+            }
+            // rare case when hasNext was not invoked yet -- just delegate to receive (leave state as is)
+            return channel.receive()
+        }
+    }
+
+    private class ReceiveElement<in E>(
+        @JvmField val cont: CancellableContinuation<E?>,
+        @JvmField val nullOnClose: Boolean
+    ) : Receive<E>() {
+        override fun tryResumeReceive(value: E, idempotent: Any?): Any? = cont.tryResume(value, idempotent)
+        override fun completeResumeReceive(token: Any) = cont.completeResume(token)
+        override fun resumeReceiveClosed(closed: Closed<*>) {
+            if (closed.closeCause == null && nullOnClose)
+                cont.resume(null)
+            else
+                cont.resumeWithException(closed.receiveException)
+        }
+        override fun toString(): String = "ReceiveElement[$cont,nullOnClose=$nullOnClose]"
+    }
+
+    private class ReceiveHasNext<E>(
+        @JvmField val iterator: Itr<E>,
+        @JvmField val cont: CancellableContinuation<Boolean>
+    ) : Receive<E>() {
+        override fun tryResumeReceive(value: E, idempotent: Any?): Any? {
+            val token = cont.tryResume(true, idempotent)
+            if (token != null) {
+                /*
+                   When idempotent != null this invocation can be stale and we cannot directly update iterator.result
+                   Instead, we save both token & result into a temporary IdempotentTokenValue object and
+                   set iterator result only in completeResumeReceive that is going to be invoked just once
+                 */
+                if (idempotent != null) return IdempotentTokenValue(token, value)
+                iterator.result = value
+            }
+            return token
+        }
+
+        override fun completeResumeReceive(token: Any) {
+            if (token is IdempotentTokenValue<*>) {
+                iterator.result = token.value
+                cont.completeResume(token.token)
+            } else
+                cont.completeResume(token)
+        }
+
+        override fun resumeReceiveClosed(closed: Closed<*>) {
+            val token = if (closed.closeCause == null)
+                cont.tryResume(false)
+            else
+                cont.tryResumeWithException(closed.receiveException)
+            if (token != null) {
+                iterator.result = closed
+                cont.completeResume(token)
+            }
+        }
+        override fun toString(): String = "ReceiveHasNext[$cont]"
+    }
+
+    private inner class ReceiveSelect<R, in E>(
+        @JvmField val select: SelectInstance<R>,
+        @JvmField val block: suspend (E?) -> R,
+        @JvmField val nullOnClose: Boolean
+    ) : Receive<E>(), DisposableHandle {
+        override fun tryResumeReceive(value: E, idempotent: Any?): Any?  =
+            if (select.trySelect(idempotent)) (value ?: NULL_VALUE) else null
+
+        @Suppress("UNCHECKED_CAST")
+        override fun completeResumeReceive(token: Any) {
+            val value: E = (if (token === NULL_VALUE) null else token) as E
+            block.startCoroutine(value, select.completion)
+        }
+
+        override fun resumeReceiveClosed(closed: Closed<*>) {
+            if (select.trySelect(null)) {
+                if (closed.closeCause == null && nullOnClose) {
+                    block.startCoroutine(null, select.completion)
+                } else {
+                    // even though we are dispatching coroutine to process channel close on receive,
+                    // which is an atomically cancellable suspending function,
+                    // close is a final state, so we can use a cancellable resume mode
+                    select.resumeSelectCancellableWithException(closed.receiveException)
+                }
+            }
+        }
+
+        fun removeOnSelectCompletion() {
+            select.disposeOnSelect(this)
+        }
+
+        override fun dispose() { // invoked on select completion
+            if (remove())
+                onReceiveDequeued() // notify cancellation of receive
+        }
+
+        override fun toString(): String = "ReceiveSelect[$select,nullOnClose=$nullOnClose]"
+    }
+
+    private class IdempotentTokenValue<out E>(
+        @JvmField val token: Any,
+        @JvmField val value: E
+    )
+}
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val OFFER_SUCCESS: Any = Symbol("OFFER_SUCCESS")
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val OFFER_FAILED: Any = Symbol("OFFER_FAILED")
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val POLL_FAILED: Any = Symbol("POLL_FAILED")
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val SELECT_STARTED: Any = Symbol("SELECT_STARTED")
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val NULL_VALUE: Any = Symbol("NULL_VALUE")
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val CLOSE_RESUMED: Any = Symbol("CLOSE_RESUMED")
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val SEND_RESUMED = Symbol("SEND_RESUMED")
+
+/**
+ * Represents sending waiter in the queue.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+public interface Send {
+    val pollResult: Any? // E | Closed
+    fun tryResumeSend(idempotent: Any?): Any?
+    fun completeResumeSend(token: Any)
+    fun resumeSendClosed(closed: Closed<*>)
+}
+
+/**
+ * Represents receiver waiter in the queue or closed token.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+public interface ReceiveOrClosed<in E> {
+    val offerResult: Any // OFFER_SUCCESS | Closed
+    fun tryResumeReceive(value: E, idempotent: Any?): Any?
+    fun completeResumeReceive(token: Any)
+}
+
+/**
+ * Represents sender for a specific element.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+@Suppress("UNCHECKED_CAST")
+public class SendElement(
+    override val pollResult: Any?,
+    @JvmField val cont: CancellableContinuation<Unit>
+) : LockFreeLinkedListNode(), Send {
+    override fun tryResumeSend(idempotent: Any?): Any? = cont.tryResume(Unit, idempotent)
+    override fun completeResumeSend(token: Any) = cont.completeResume(token)
+    override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
+    override fun toString(): String = "SendElement($pollResult)[$cont]"
+}
+
+/**
+ * Represents closed channel.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+public class Closed<in E>(
+    @JvmField val closeCause: Throwable?
+) : LockFreeLinkedListNode(), Send, ReceiveOrClosed<E> {
+    val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
+    val receiveException: Throwable get() = closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
+
+    override val offerResult get() = this
+    override val pollResult get() = this
+    override fun tryResumeSend(idempotent: Any?): Any? = CLOSE_RESUMED
+    override fun completeResumeSend(token: Any) { check(token === CLOSE_RESUMED) }
+    override fun tryResumeReceive(value: E, idempotent: Any?): Any? = CLOSE_RESUMED
+    override fun completeResumeReceive(token: Any) { check(token === CLOSE_RESUMED) }
+    override fun resumeSendClosed(closed: Closed<*>) = error("Should be never invoked")
+    override fun toString(): String = "Closed[$closeCause]"
+}
+
+private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
+    override val offerResult get() = OFFER_SUCCESS
+    abstract fun resumeReceiveClosed(closed: Closed<*>)
+}
+
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt
new file mode 100644
index 0000000..ffdf694
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt
@@ -0,0 +1,359 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.internal.*
+import kotlinx.coroutines.experimental.internalAnnotations.*
+import kotlinx.coroutines.experimental.selects.*
+
+/**
+ * Broadcast channel with array buffer of a fixed [capacity].
+ * Sender suspends only when buffer is full due to one of the receives being slow to consume and
+ * receiver suspends only when buffer is empty.
+ *
+ * Note, that elements that are sent to the broadcast channel while there are no [openSubscription] subscribers are immediately
+ * lost.
+ *
+ * This channel is created by `BroadcastChannel(capacity)` factory function invocation.
+ *
+ * This implementation uses lock to protect the buffer, which is held only during very short buffer-update operations.
+ * The lock at each subscription is also used to manage concurrent attempts to receive from the same subscriber.
+ * The lists of suspended senders or receivers are lock-free.
+ */
+class ArrayBroadcastChannel<E>(
+    /**
+     * Buffer capacity.
+     */
+    val capacity: Int
+) : AbstractSendChannel<E>(), BroadcastChannel<E> {
+    init {
+        require(capacity >= 1) { "ArrayBroadcastChannel capacity must be at least 1, but $capacity was specified" }
+    }
+
+    private val bufferLock = ReentrantLock()
+    private val buffer = arrayOfNulls<Any?>(capacity) // guarded by bufferLock
+
+    // head & tail are Long (64 bits) and we assume that they never wrap around
+    // head, tail, and size are guarded by bufferLock
+    @Volatile
+    private var head: Long = 0 // do modulo on use of head
+    @Volatile
+    private var tail: Long = 0 // do modulo on use of tail
+    @Volatile
+    private var size: Int = 0
+
+    /*
+        Writes to buffer are guarded by bufferLock, but reads from buffer are concurrent with writes
+          - Write element to buffer then write "tail" (volatile)
+          - Read "tail" (volatile), then read element from buffer
+        So read/writes to buffer need not be volatile
+     */
+
+    private val subs = subscriberList<Subscriber<E>>()
+
+    override val isBufferAlwaysFull: Boolean get() = false
+    override val isBufferFull: Boolean get() = size >= capacity
+
+    override fun openSubscription(): SubscriptionReceiveChannel<E> =
+        Subscriber(this).also {
+            updateHead(addSub = it)
+        }
+
+    override fun close(cause: Throwable?): Boolean {
+        if (!super.close(cause)) return false
+        checkSubOffers()
+        return true
+    }
+
+    // result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
+    override fun offerInternal(element: E): Any {
+        bufferLock.withLock {
+            // check if closed for send (under lock, so size cannot change)
+            closedForSend?.let { return it }
+            val size = this.size
+            if (size >= capacity) return OFFER_FAILED
+            val tail = this.tail
+            buffer[(tail % capacity).toInt()] = element
+            this.size = size + 1
+            this.tail = tail + 1
+        }
+        // if offered successfully, then check subs outside of lock
+        checkSubOffers()
+        return OFFER_SUCCESS
+    }
+
+    // result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`
+    override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
+        bufferLock.withLock {
+            // check if closed for send (under lock, so size cannot change)
+            closedForSend?.let { return it }
+            val size = this.size
+            if (size >= capacity) return OFFER_FAILED
+            // let's try to select sending this element to buffer
+            if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
+                return ALREADY_SELECTED
+            }
+            val tail = this.tail
+            buffer[(tail % capacity).toInt()] = element
+            this.size = size + 1
+            this.tail = tail + 1
+        }
+        // if offered successfully, then check subs outside of lock
+        checkSubOffers()
+        return OFFER_SUCCESS
+    }
+
+    private fun checkSubOffers() {
+        var updated = false
+        var hasSubs = false
+        @Suppress("LoopToCallChain") // must invoke `checkOffer` on every sub
+        for (sub in subs) {
+            hasSubs = true
+            if (sub.checkOffer()) updated = true
+        }
+        if (updated || !hasSubs)
+            updateHead()
+    }
+
+    // updates head if needed and optionally adds / removes subscriber under the same lock
+    private tailrec fun updateHead(addSub: Subscriber<E>? = null, removeSub: Subscriber<E>? = null) {
+        // update head in a tail rec loop
+        var send: Send? = null
+        var token: Any? = null
+        bufferLock.withLock {
+            if (addSub != null) {
+                addSub.subHead = tail // start from last element
+                val wasEmpty = subs.isEmpty()
+                subs.add(addSub)
+                if (!wasEmpty) return // no need to update when adding second and etc sub
+            }
+            if (removeSub != null) {
+                subs.remove(removeSub)
+                if (head != removeSub.subHead) return // no need to update
+            }
+            val minHead = computeMinHead()
+            val tail = this.tail
+            var head = this.head
+            val targetHead = minHead.coerceAtMost(tail)
+            if (targetHead <= head) return // nothing to do -- head was already moved
+            var size = this.size
+            // clean up removed (on not need if we don't have any subscribers anymore)
+            while (head < targetHead) {
+                buffer[(head % capacity).toInt()] = null
+                val wasFull = size >= capacity
+                // update the size before checking queue (no more senders can queue up)
+                this.head = ++head
+                this.size = --size
+                if (wasFull) {
+                    while (true) {
+                        send = takeFirstSendOrPeekClosed() ?: break // when when no sender
+                        if (send is Closed<*>) break // break when closed for send
+                        token = send!!.tryResumeSend(idempotent = null)
+                        if (token != null) {
+                            // put sent element to the buffer
+                            buffer[(tail % capacity).toInt()] = (send as Send).pollResult
+                            this.size = size + 1
+                            this.tail = tail + 1
+                            return@withLock // go out of lock to wakeup this sender
+                        }
+                    }
+                }
+            }
+            return // done updating here -> return
+        }
+        // we only get out of the lock normally when there is a sender to resume
+        send!!.completeResumeSend(token!!)
+        // since we've just sent an element, we might need to resume some receivers
+        checkSubOffers()
+        // tailrec call to recheck
+        updateHead()
+    }
+
+    private fun computeMinHead(): Long {
+        var minHead = Long.MAX_VALUE
+        for (sub in subs)
+            minHead = minHead.coerceAtMost(sub.subHead) // volatile (atomic) reads of subHead
+        return minHead
+    }
+
+    @Suppress("UNCHECKED_CAST")
+    private fun elementAt(index: Long): E = buffer[(index % capacity).toInt()] as E
+
+    private class Subscriber<E>(
+        private val broadcastChannel: ArrayBroadcastChannel<E>
+    ) : AbstractChannel<E>(), SubscriptionReceiveChannel<E> {
+        private val subLock = ReentrantLock()
+
+        @Volatile
+        @JvmField
+        var subHead: Long = 0 // guarded by subLock
+
+        override val isBufferAlwaysEmpty: Boolean get() = false
+        override val isBufferEmpty: Boolean get() = subHead >= broadcastChannel.tail
+        override val isBufferAlwaysFull: Boolean get() = error("Should not be used")
+        override val isBufferFull: Boolean get() = error("Should not be used")
+
+        override fun cancel(cause: Throwable?): Boolean =
+            close(cause).also { closed ->
+                if (closed) broadcastChannel.updateHead(removeSub = this)
+            }
+
+        // returns true if subHead was updated and broadcast channel's head must be checked
+        // this method is lock-free (it never waits on lock)
+        @Suppress("UNCHECKED_CAST")
+        fun checkOffer(): Boolean {
+            var updated = false
+            var closed: Closed<*>? = null
+        loop@
+            while (needsToCheckOfferWithoutLock()) {
+                // just use `tryLock` here and break when some other thread is checking under lock
+                // it means that `checkOffer` must be retried after every `unlock`
+                if (!subLock.tryLock()) break
+                val receive: ReceiveOrClosed<E>?
+                val token: Any?
+                try {
+                    val result = peekUnderLock()
+                    when {
+                        result === POLL_FAILED -> continue@loop // must retest `needsToCheckOfferWithoutLock` outside of the lock
+                        result is Closed<*> -> {
+                            closed = result
+                            break@loop // was closed
+                        }
+                    }
+                    // find a receiver for an element
+                    receive = takeFirstReceiveOrPeekClosed() ?: break // break when no one's receiving
+                    if (receive is Closed<*>) break // noting more to do if this sub already closed
+                    token = receive.tryResumeReceive(result as E, idempotent = null)
+                    if (token == null) continue // bail out here to next iteration (see for next receiver)
+                    val subHead = this.subHead
+                    this.subHead = subHead + 1 // retrieved element for this subscriber
+                    updated = true
+                } finally {
+                    subLock.unlock()
+                }
+                receive!!.completeResumeReceive(token!!)
+            }
+            // do close outside of lock if needed
+            closed?.also { close(cause = it.closeCause) }
+            return updated
+        }
+
+        // result is `E | POLL_FAILED | Closed`
+        override fun pollInternal(): Any? {
+            var updated = false
+            val result = subLock.withLock {
+                val result = peekUnderLock()
+                when {
+                    result is Closed<*> -> { /* just bail out of lock */ }
+                    result === POLL_FAILED -> { /* just bail out of lock */ }
+                    else -> {
+                        // update subHead after retrieiving element from buffer
+                        val subHead = this.subHead
+                        this.subHead = subHead + 1
+                        updated = true
+                    }
+                }
+                result
+            }
+            // do close outside of lock
+            (result as? Closed<*>)?.also { close(cause = it.closeCause) }
+            // there could have been checkOffer attempt while we were holding lock
+            // now outside the lock recheck if anything else to offer
+            if (checkOffer())
+                updated = true
+            // and finally update broadcast's channel head if needed
+            if (updated)
+                broadcastChannel.updateHead()
+            return result
+        }
+
+        // result is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
+        override fun pollSelectInternal(select: SelectInstance<*>): Any? {
+            var updated = false
+            val result = subLock.withLock {
+                var result = peekUnderLock()
+                when {
+                    result is Closed<*> -> { /* just bail out of lock */ }
+                    result === POLL_FAILED -> { /* just bail out of lock */ }
+                    else -> {
+                        // let's try to select receiving this element from buffer
+                        if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
+                            result = ALREADY_SELECTED
+                        } else {
+                            // update subHead after retrieiving element from buffer
+                            val subHead = this.subHead
+                            this.subHead = subHead + 1
+                            updated = true
+                        }
+                    }
+                }
+                result
+            }
+            // do close outside of lock
+            (result as? Closed<*>)?.also { close(cause = it.closeCause) }
+            // there could have been checkOffer attempt while we were holding lock
+            // now outside the lock recheck if anything else to offer
+            if (checkOffer())
+                updated = true
+            // and finally update broadcast's channel head if needed
+            if (updated)
+                broadcastChannel.updateHead()
+            return result
+        }
+
+        // Must invoke this check this after lock, because offer's invocation of `checkOffer` might have failed
+        // to `tryLock` just before the lock was about to unlocked, thus loosing notification to this
+        // subscription about an element that was just offered
+        private fun needsToCheckOfferWithoutLock(): Boolean {
+            if (closedForReceive != null)
+                return false // already closed -> nothing to do
+            if (isBufferEmpty && broadcastChannel.closedForReceive == null)
+                return false // no data for us && broadcast channel was not closed yet -> nothing to do
+            return true // check otherwise
+        }
+
+        // guarded by lock, returns:
+        //      E - the element from the buffer at subHead
+        //      Closed<*> when closed;
+        //      POLL_FAILED when there seems to be no data, but must retest `needsToCheckOfferWithoutLock` outside of lock
+        private fun peekUnderLock(): Any? {
+            val subHead = this.subHead // guarded read (can be non-volatile read)
+            // note: from the broadcastChannel we must read closed token first, then read its tail
+            // because it is Ok if tail moves in between the reads (we make decision based on tail first)
+            val closedBroadcast = broadcastChannel.closedForReceive // unguarded volatile read
+            val tail = broadcastChannel.tail // unguarded volatile read
+            if (subHead >= tail) {
+                // no elements to poll from the queue -- check if closed broads & closed this sub
+                // must retest `needsToCheckOfferWithoutLock` outside of the lock
+                return closedBroadcast ?: this.closedForReceive ?: POLL_FAILED
+            }
+            // Get tentative result. This result may be wrong (completely invalid value, including null),
+            // because this subscription might get closed, moving channel's head past this subscription's head.
+            val result = broadcastChannel.elementAt(subHead)
+            // now check if this subscription was closed
+            val closedSub = this.closedForReceive
+            if (closedSub != null) return closedSub
+            // we know the subscription was not closed, so this tentative result is Ok to return
+            return result
+        }
+    }
+
+    // ------ debug ------
+
+    override val bufferDebugString: String
+        get() = "(buffer:capacity=${buffer.size},size=$size)"
+}
\ No newline at end of file
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
new file mode 100644
index 0000000..b118a89
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
@@ -0,0 +1,252 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.internal.*
+import kotlinx.coroutines.experimental.internalAnnotations.Volatile
+import kotlinx.coroutines.experimental.selects.*
+
+/**
+ * Channel with array buffer of a fixed [capacity].
+ * Sender suspends only when buffer is fully and receiver suspends only when buffer is empty.
+ *
+ * This channel is created by `Channel(capacity)` factory function invocation.
+ *
+ * This implementation uses lock to protect the buffer, which is held only during very short buffer-update operations.
+ * The lists of suspended senders or receivers are lock-free.
+ */
+public open class ArrayChannel<E>(
+    /**
+     * Buffer capacity.
+     */
+    val capacity: Int
+) : AbstractChannel<E>() {
+    init {
+        require(capacity >= 1) { "ArrayChannel capacity must be at least 1, but $capacity was specified" }
+    }
+
+    private val lock = ReentrantLock()
+    private val buffer: Array<Any?> = arrayOfNulls<Any?>(capacity)
+    private var head: Int = 0
+    @Volatile
+    private var size: Int = 0
+
+    protected final override val isBufferAlwaysEmpty: Boolean get() = false
+    protected final override val isBufferEmpty: Boolean get() = size == 0
+    protected final override val isBufferAlwaysFull: Boolean get() = false
+    protected final override val isBufferFull: Boolean get() = size == capacity
+
+    // result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
+    protected override fun offerInternal(element: E): Any {
+        var receive: ReceiveOrClosed<E>? = null
+        var token: Any? = null
+        lock.withLock {
+            val size = this.size
+            closedForSend?.let { return it }
+            if (size < capacity) {
+                // tentatively put element to buffer
+                this.size = size + 1 // update size before checking queue (!!!)
+                // check for receivers that were waiting on empty queue
+                if (size == 0) {
+                    loop@ while (true) {
+                        receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
+                        if (receive is Closed) {
+                            this.size = size // restore size
+                            return receive!!
+                        }
+                        token = receive!!.tryResumeReceive(element, idempotent = null)
+                        if (token != null) {
+                            this.size = size // restore size
+                            return@withLock
+                        }
+                    }
+                }
+                buffer[(head + size) % capacity] = element // actually queue element
+                return OFFER_SUCCESS
+            }
+            // size == capacity: full
+            return OFFER_FAILED
+        }
+        // breaks here if offer meets receiver
+        receive!!.completeResumeReceive(token!!)
+        return receive!!.offerResult
+    }
+
+    // result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`
+    protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
+        var receive: ReceiveOrClosed<E>? = null
+        var token: Any? = null
+        lock.withLock {
+            val size = this.size
+            closedForSend?.let { return it }
+            if (size < capacity) {
+                // tentatively put element to buffer
+                this.size = size + 1 // update size before checking queue (!!!)
+                // check for receivers that were waiting on empty queue
+                if (size == 0) {
+                    loop@ while (true) {
+                        val offerOp = describeTryOffer(element)
+                        val failure = select.performAtomicTrySelect(offerOp)
+                        when {
+                            failure == null -> { // offered successfully
+                                this.size = size // restore size
+                                receive = offerOp.result
+                                token = offerOp.resumeToken
+                                check(token != null)
+                                return@withLock
+                            }
+                            failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
+                            failure === ALREADY_SELECTED || failure is Closed<*> -> {
+                                this.size = size // restore size
+                                return failure
+                            }
+                            else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
+                        }
+                    }
+                }
+                // let's try to select sending this element to buffer
+                if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
+                    this.size = size // restore size
+                    return ALREADY_SELECTED
+                }
+                buffer[(head + size) % capacity] = element // actually queue element
+                return OFFER_SUCCESS
+            }
+            // size == capacity: full
+            return OFFER_FAILED
+        }
+        // breaks here if offer meets receiver
+        receive!!.completeResumeReceive(token!!)
+        return receive!!.offerResult
+    }
+
+    // result is `E | POLL_FAILED | Closed`
+    protected override fun pollInternal(): Any? {
+        var send: Send? = null
+        var token: Any? = null
+        var result: Any? = null
+        lock.withLock {
+            val size = this.size
+            if (size == 0) return closedForSend ?: POLL_FAILED // when nothing can be read from buffer
+            // size > 0: not empty -- retrieve element
+            result = buffer[head]
+            buffer[head] = null
+            this.size = size - 1 // update size before checking queue (!!!)
+            // check for senders that were waiting on full queue
+            var replacement: Any? = POLL_FAILED
+            if (size == capacity) {
+                loop@ while (true) {
+                    send = takeFirstSendOrPeekClosed() ?: break
+                    token = send!!.tryResumeSend(idempotent = null)
+                    if (token != null) {
+                        replacement = send!!.pollResult
+                        break@loop
+                    }
+                }
+            }
+            if (replacement !== POLL_FAILED && replacement !is Closed<*>) {
+                this.size = size // restore size
+                buffer[(head + size) % capacity] = replacement
+            }
+            head = (head + 1) % capacity
+        }
+        // complete send the we're taken replacement from
+        if (token != null)
+            send!!.completeResumeSend(token!!)
+        return result
+    }
+
+    // result is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
+    protected override fun pollSelectInternal(select: SelectInstance<*>): Any? {
+        var send: Send? = null
+        var token: Any? = null
+        var result: Any? = null
+        lock.withLock {
+            val size = this.size
+            if (size == 0) return closedForSend ?: POLL_FAILED
+            // size > 0: not empty -- retrieve element
+            result = buffer[head]
+            buffer[head] = null
+            this.size = size - 1 // update size before checking queue (!!!)
+            // check for senders that were waiting on full queue
+            var replacement: Any? = POLL_FAILED
+            if (size == capacity) {
+                loop@ while (true) {
+                    val pollOp = describeTryPoll()
+                    val failure = select.performAtomicTrySelect(pollOp)
+                    when {
+                        failure == null -> { // polled successfully
+                            send = pollOp.result
+                            token = pollOp.resumeToken
+                            check(token != null)
+                            replacement = send!!.pollResult
+                            break@loop
+                        }
+                        failure === POLL_FAILED -> break@loop // cannot poll -> Ok to take from buffer
+                        failure === ALREADY_SELECTED -> {
+                            this.size = size // restore size
+                            buffer[head] = result // restore head
+                            return failure
+                        }
+                        failure is Closed<*> -> {
+                            send = failure
+                            token = failure.tryResumeSend(idempotent = null)
+                            replacement = failure
+                            break@loop
+                        }
+                        else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
+                    }
+                }
+            }
+            if (replacement !== POLL_FAILED && replacement !is Closed<*>) {
+                this.size = size // restore size
+                buffer[(head + size) % capacity] = replacement
+            } else {
+                // failed to poll or is already closed --> let's try to select receiving this element from buffer
+                if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
+                    this.size = size // restore size
+                    buffer[head] = result // restore head
+                    return ALREADY_SELECTED
+                }
+            }
+            head = (head + 1) % capacity
+        }
+        // complete send the we're taken replacement from
+        if (token != null)
+            send!!.completeResumeSend(token!!)
+        return result
+    }
+
+    // Note: this function is invoked when channel is already closed
+    override fun cleanupSendQueueOnCancel() {
+        // clear buffer first
+        lock.withLock {
+            repeat(size) {
+                buffer[head] = 0
+                head = (head + 1) % capacity
+            }
+            size = 0
+        }
+        // then clean all queued senders
+        super.cleanupSendQueueOnCancel()
+    }
+
+    // ------ debug ------
+
+    override val bufferDebugString: String
+        get() = "(buffer:capacity=${buffer.size},size=$size)"
+}
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt
new file mode 100644
index 0000000..0a9c589
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
+import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
+import kotlinx.coroutines.experimental.internal.Closeable
+
+/**
+ * Broadcast channel is a non-blocking primitive for communication between the sender and multiple receivers
+ * that subscribe for the elements using [openSubscription] function and unsubscribe using [SubscriptionReceiveChannel.close]
+ * function.
+ *
+ * See `BroadcastChannel()` factory function for the description of available
+ * broadcast channel implementations.
+ */
+public interface BroadcastChannel<E> : SendChannel<E> {
+    /**
+     * Factory for broadcast channels.
+     * @suppress **Deprecated**
+     */
+    public companion object Factory {
+        /**
+         * Creates a broadcast channel with the specified buffer capacity.
+         * @suppress **Deprecated**
+         */
+        @Deprecated("Replaced with top-level function", level = DeprecationLevel.HIDDEN)
+        public operator fun <E> invoke(capacity: Int): BroadcastChannel<E> = BroadcastChannel(capacity)
+    }
+
+    /**
+     * Subscribes to this [BroadcastChannel] and returns a channel to receive elements from it.
+     * The resulting channel shall be [closed][SubscriptionReceiveChannel.close] to unsubscribe from this
+     * broadcast channel.
+     */
+    public fun openSubscription(): SubscriptionReceiveChannel<E>
+
+    /**
+     * @suppress **Deprecated**: Renamed to [openSubscription]
+     */
+    @Deprecated(message = "Renamed to `openSubscription`",
+        replaceWith = ReplaceWith("openSubscription()"))
+    public fun open(): SubscriptionReceiveChannel<E> = openSubscription()
+}
+
+/**
+ * Creates a broadcast channel with the specified buffer capacity.
+ *
+ * The resulting channel type depends on the specified [capacity] parameter:
+ * * when `capacity` positive, but less than [UNLIMITED] -- creates [ArrayBroadcastChannel];
+ * * when `capacity` is [CONFLATED] -- creates [ConflatedBroadcastChannel] that conflates back-to-back sends;
+ * * otherwise -- throws [IllegalArgumentException].
+ */
+public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> =
+    when (capacity) {
+        0 -> throw IllegalArgumentException("Unsupported 0 capacity for BroadcastChannel")
+        UNLIMITED -> throw IllegalArgumentException("Unsupported UNLIMITED capacity for BroadcastChannel")
+        CONFLATED -> ConflatedBroadcastChannel()
+        else -> ArrayBroadcastChannel(capacity)
+    }
+
+/**
+ * Return type for [BroadcastChannel.openSubscription] that can be used to [receive] elements from the
+ * open subscription and to [close] it to unsubscribe.
+ *
+ * Note, that invocation of [cancel] also closes subscription.
+ */
+public interface SubscriptionReceiveChannel<out T> : ReceiveChannel<T>, Closeable {
+    /**
+     * Closes this subscription. This is a synonym for [cancel].
+     */
+    public override fun close() { cancel() }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
new file mode 100644
index 0000000..912f87e
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
@@ -0,0 +1,331 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.CancellationException
+import kotlinx.coroutines.experimental.CoroutineScope
+import kotlinx.coroutines.experimental.Job
+import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
+import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
+import kotlinx.coroutines.experimental.selects.SelectClause1
+import kotlinx.coroutines.experimental.selects.SelectClause2
+import kotlinx.coroutines.experimental.selects.select
+import kotlinx.coroutines.experimental.yield
+
+/**
+ * Sender's interface to [Channel].
+ */
+public interface SendChannel<in E> {
+    /**
+     * Returns `true` if this channel was closed by invocation of [close] and thus
+     * the [send] and [offer] attempts throws exception.
+     */
+    public val isClosedForSend: Boolean
+
+    /**
+     * Returns `true` if the channel is full (out of capacity) and the [send] attempt will suspend.
+     * This function returns `false` for [isClosedForSend] channel.
+     */
+    public val isFull: Boolean
+
+    /**
+     * Adds [element] into to this channel, suspending the caller while this channel [isFull],
+     * or throws exception if the channel [isClosedForSend] (see [close] for details).
+     *
+     * Note, that closing a channel _after_ this function had suspended does not cause this suspended send invocation
+     * to abort, because closing a channel is conceptually like sending a special "close token" over this channel.
+     * All elements that are sent over the channel are delivered in first-in first-out order. The element that
+     * is being sent will get delivered to receivers before a close token.
+     *
+     * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
+     * function is suspended, this function immediately resumes with [CancellationException].
+     *
+     * *Cancellation of suspended send is atomic* -- when this function
+     * throws [CancellationException] it means that the [element] was not sent to this channel.
+     * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
+     * continue to execute even after it was cancelled from the same thread in the case when this send operation
+     * was already resumed and the continuation was posted for execution to the thread's queue.
+     *
+     * Note, that this function does not check for cancellation when it is not suspended.
+     * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+     *
+     * This function can be used in [select] invocation with [onSend] clause.
+     * Use [offer] to try sending to this channel without waiting.
+     */
+    public suspend fun send(element: E)
+
+    /**
+     * Clause for [select] expression of [send] suspending function that selects when the element that is specified
+     * as parameter is sent to the channel. When the clause is selected the reference to this channel
+     * is passed into the corresponding block.
+     *
+     * The [select] invocation fails with exception if the channel [isClosedForSend] (see [close] for details).
+     */
+    public val onSend: SelectClause2<E, SendChannel<E>>
+
+    /**
+     * Adds [element] into this queue if it is possible to do so immediately without violating capacity restrictions
+     * and returns `true`. Otherwise, it returns `false` immediately
+     * or throws exception if the channel [isClosedForSend] (see [close] for details).
+     */
+    public fun offer(element: E): Boolean
+
+    /**
+     * Closes this channel with an optional exceptional [cause].
+     * This is an idempotent operation -- repeated invocations of this function have no effect and return `false`.
+     * Conceptually, its sends a special "close token" over this channel.
+     *
+     * Immediately after invocation of this function
+     * [isClosedForSend] starts returning `true`. However, [isClosedForReceive][ReceiveChannel.isClosedForReceive]
+     * on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements
+     * are received.
+     *
+     * A channel that was closed without a [cause] throws [ClosedSendChannelException] on attempts to send or receive.
+     * A channel that was closed with non-null [cause] is called a _failed_ channel. Attempts to send or
+     * receive on a failed channel throw the specified [cause] exception.
+     */
+    public fun close(cause: Throwable? = null): Boolean
+}
+
+/**
+ * Receiver's interface to [Channel].
+ */
+public interface ReceiveChannel<out E> {
+    /**
+     * Returns `true` if this channel was closed by invocation of [close][SendChannel.close] on the [SendChannel]
+     * side and all previously sent items were already received, so that the [receive] attempt
+     * throws [ClosedReceiveChannelException]. If the channel was closed because of the exception, it
+     * is considered closed, too, but it is called a _failed_ channel. All suspending attempts to receive
+     * an element from a failed channel throw the original [close][SendChannel.close] cause exception.
+     */
+    public val isClosedForReceive: Boolean
+
+    /**
+     * Returns `true` if the channel is empty (contains no elements) and the [receive] attempt will suspend.
+     * This function returns `false` for [isClosedForReceive] channel.
+     */
+    public val isEmpty: Boolean
+
+    /**
+     * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
+     * or throws [ClosedReceiveChannelException] if the channel [isClosedForReceive].
+     * If the channel was closed because of the exception, it is called a _failed_ channel and this function
+     * throws the original [close][SendChannel.close] cause exception.
+     *
+     * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
+     * function is suspended, this function immediately resumes with [CancellationException].
+     *
+     * *Cancellation of suspended receive is atomic* -- when this function
+     * throws [CancellationException] it means that the element was not retrieved from this channel.
+     * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
+     * continue to execute even after it was cancelled from the same thread in the case when this receive operation
+     * was already resumed and the continuation was posted for execution to the thread's queue.
+     *
+     * Note, that this function does not check for cancellation when it is not suspended.
+     * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+     *
+     * This function can be used in [select] invocation with [onReceive] clause.
+     * Use [poll] to try receiving from this channel without waiting.
+     */
+    public suspend fun receive(): E
+
+    /**
+     * Clause for [select] expression of [receive] suspending function that selects with the element that
+     * is received from the channel.
+     * The [select] invocation fails with exception if the channel
+     * [isClosedForReceive] (see [close][SendChannel.close] for details).
+     */
+    public val onReceive: SelectClause1<E>
+
+    /**
+     * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
+     * or returns `null` if the channel is [closed][isClosedForReceive] without cause
+     * or throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
+     *
+     * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
+     * function is suspended, this function immediately resumes with [CancellationException].
+     *
+     * *Cancellation of suspended receive is atomic* -- when this function
+     * throws [CancellationException] it means that the element was not retrieved from this channel.
+     * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
+     * continue to execute even after it was cancelled from the same thread in the case when this receive operation
+     * was already resumed and the continuation was posted for execution to the thread's queue.
+     *
+     * Note, that this function does not check for cancellation when it is not suspended.
+     * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+     *
+     * This function can be used in [select] invocation with [onReceiveOrNull] clause.
+     * Use [poll] to try receiving from this channel without waiting.
+     */
+    public suspend fun receiveOrNull(): E?
+
+    /**
+     * Clause for [select] expression of [receiveOrNull] suspending function that selects with the element that
+     * is received from the channel or selects with `null` if if the channel
+     * [isClosedForReceive] without cause. The [select] invocation fails with
+     * the original [close][SendChannel.close] cause exception if the channel has _failed_.
+     */
+    public val onReceiveOrNull: SelectClause1<E?>
+
+    /**
+     * Retrieves and removes the element from this channel, or returns `null` if this channel [isEmpty]
+     * or is [isClosedForReceive] without cause.
+     * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
+     */
+    public fun poll(): E?
+
+    /**
+     * Returns new iterator to receive elements from this channels using `for` loop.
+     * Iteration completes normally when the channel is [isClosedForReceive] without cause and
+     * throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
+     */
+    public operator fun iterator(): ChannelIterator<E>
+
+    /**
+     * Cancels reception of remaining elements from this channel. This function closes the channel with
+     * the specified cause (unless it was already closed) and removes all buffered sent elements from it.
+     * This function returns `true` if the channel was not closed previously, or `false` otherwise.
+     *
+     * Immediately after invocation of this function [isClosedForReceive] and
+     * [isClosedForSend][SendChannel.isClosedForSend]
+     * on the side of [SendChannel] start returning `true`, so all attempts to send to this channel
+     * afterwards will throw [ClosedSendChannelException], while attempts to receive will throw
+     * [ClosedReceiveChannelException] if it was cancelled without a cause.
+     * A channel that was cancelled with non-null [cause] is called a _failed_ channel. Attempts to send or
+     * receive on a failed channel throw the specified [cause] exception.
+     */
+    public fun cancel(cause: Throwable? = null): Boolean
+}
+
+/**
+ * Iterator for [ReceiveChannel]. Instances of this interface are *not thread-safe* and shall not be used
+ * from concurrent coroutines.
+ */
+public interface ChannelIterator<out E> {
+    /**
+     * Returns `true` if the channel has more elements suspending the caller while this channel
+     * [isEmpty][ReceiveChannel.isEmpty] or returns `false` if the channel
+     * [isClosedForReceive][ReceiveChannel.isClosedForReceive] without cause.
+     * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
+     *
+     * This function retrieves and removes the element from this channel for the subsequent invocation
+     * of [next].
+     *
+     * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
+     * function is suspended, this function immediately resumes with [CancellationException].
+     *
+     * *Cancellation of suspended receive is atomic* -- when this function
+     * throws [CancellationException] it means that the element was not retrieved from this channel.
+     * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
+     * continue to execute even after it was cancelled from the same thread in the case when this receive operation
+     * was already resumed and the continuation was posted for execution to the thread's queue.
+     *
+     * Note, that this function does not check for cancellation when it is not suspended.
+     * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+     */
+    public suspend operator fun hasNext(): Boolean
+
+    /**
+     * Retrieves and removes the element from this channel suspending the caller while this channel
+     * [isEmpty][ReceiveChannel.isEmpty] or throws [ClosedReceiveChannelException] if the channel
+     * [isClosedForReceive][ReceiveChannel.isClosedForReceive] without cause.
+     * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
+     *
+     * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
+     * function is suspended, this function immediately resumes with [CancellationException].
+     *
+     * *Cancellation of suspended receive is atomic* -- when this function
+     * throws [CancellationException] it means that the element was not retrieved from this channel.
+     * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
+     * continue to execute even after it was cancelled from the same thread in the case when this receive operation
+     * was already resumed and the continuation was posted for execution to the thread's queue.
+     *
+     * Note, that this function does not check for cancellation when it is not suspended.
+     * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+     */
+    public suspend operator fun next(): E
+}
+
+/**
+ * Channel is a non-blocking primitive for communication between sender using [SendChannel] and receiver using [ReceiveChannel].
+ * Conceptually, a channel is similar to [BlockingQueue][java.util.concurrent.BlockingQueue],
+ * but it has suspending operations instead of blocking ones and it can be closed.
+ *
+ * See `Channel(capacity)` factory function for the description of available channel implementations.
+ */
+public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
+    /**
+     * Constants for channel factory function `Channel()`.
+     */
+    public companion object Factory {
+        /**
+         * Requests channel with unlimited capacity buffer in `Channel(...)` factory function --
+         * the [LinkedListChannel] gets created.
+         */
+        public const val UNLIMITED = Int.MAX_VALUE
+
+        /**
+         * Requests conflated channel in `Channel(...)` factory function --
+         * the [ConflatedChannel] gets created.
+         */
+        public const val CONFLATED = -1
+
+        /**
+         * Creates a channel with the specified buffer capacity (or without a buffer by default).
+         * @suppress **Deprecated**
+         */
+        @Deprecated("Replaced with top-level function", level = DeprecationLevel.HIDDEN)
+        public operator fun <E> invoke(capacity: Int = 0): Channel<E> = Channel(capacity)
+    }
+}
+
+/**
+ * Creates a channel without a buffer -- [RendezvousChannel].
+ */
+public fun <E> Channel(): Channel<E> = RendezvousChannel<E>()
+
+/**
+ * Creates a channel with the specified buffer capacity (or without a buffer by default).
+ *
+ * The resulting channel type depends on the specified [capacity] parameter:
+ * * when `capacity` is 0 -- creates [RendezvousChannel] without a buffer;
+ * * when `capacity` is [Channel.UNLIMITED] -- creates [LinkedListChannel] with buffer of unlimited size;
+ * * when `capacity` is [Channel.CONFLATED] -- creates [ConflatedChannel] that conflates back-to-back sends;
+ * * when `capacity` is positive, but less than [UNLIMITED] -- creates [ArrayChannel] with a buffer of the specified `capacity`;
+ * * otherwise -- throws [IllegalArgumentException].
+ */
+public fun <E> Channel(capacity: Int): Channel<E> =
+    when (capacity) {
+        0 -> RendezvousChannel()
+        UNLIMITED -> LinkedListChannel()
+        CONFLATED -> ConflatedChannel()
+        else -> ArrayChannel(capacity)
+    }
+
+/**
+ * Indicates attempt to [send][SendChannel.send] on [isClosedForSend][SendChannel.isClosedForSend] channel
+ * that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause
+ * exception on send attempts.
+ */
+public class ClosedSendChannelException(message: String?) : CancellationException(message)
+
+/**
+ * Indicates attempt to [receive][ReceiveChannel.receive] on [isClosedForReceive][ReceiveChannel.isClosedForReceive]
+ * channel that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause
+ * exception on receive attempts.
+ */
+public class ClosedReceiveChannelException(message: String?) : NoSuchElementException(message)
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt
new file mode 100644
index 0000000..a23c2dc
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+
+internal open class ChannelCoroutine<E>(
+    parentContext: CoroutineContext,
+    protected val _channel: Channel<E>,
+    active: Boolean
+) : AbstractCoroutine<Unit>(parentContext, active), Channel<E> by _channel {
+    val channel: Channel<E>
+        get() = this
+
+    // Workaround for KT-23094
+    override suspend fun receive(): E = _channel.receive()
+
+    override suspend fun send(element: E) = _channel.send(element)
+
+    override suspend fun receiveOrNull(): E? = _channel.receiveOrNull()
+
+    override fun cancel(cause: Throwable?): Boolean = super.cancel(cause)
+}
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt
new file mode 100644
index 0000000..ecb4262
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt
@@ -0,0 +1,1551 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+
+internal const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"
+
+
+// -------- Conversions to ReceiveChannel  --------
+
+/**
+ * Returns a channel to read all element of the [Iterable].
+ */
+public fun <E> Iterable<E>.asReceiveChannel(context: CoroutineContext = Unconfined): ReceiveChannel<E> =
+    produce(context) {
+        for (element in this@asReceiveChannel)
+            send(element)
+    }
+
+/**
+ * Returns a channel to read all element of the [Sequence].
+ */
+public fun <E> Sequence<E>.asReceiveChannel(context: CoroutineContext = Unconfined): ReceiveChannel<E> =
+    produce(context) {
+        for (element in this@asReceiveChannel)
+            send(element)
+    }
+
+// -------- Operations on BroadcastChannel --------
+
+/**
+ * Opens subscription to this [BroadcastChannel] and makes sure that the given [block] consumes all elements
+ * from it by always invoking [cancel][SubscriptionReceiveChannel.cancel] after the execution of the block.
+ */
+public inline fun <E, R> BroadcastChannel<E>.consume(block: SubscriptionReceiveChannel<E>.() -> R): R {
+    val channel = openSubscription()
+    try {
+        return channel.block()
+    } finally {
+        channel.cancel()
+    }
+}
+
+/**
+ * Subscribes to this [BroadcastChannel] and performs the specified action for each received element.
+ */
+public suspend inline fun <E> BroadcastChannel<E>.consumeEach(action: (E) -> Unit) =
+    consume {
+        for (element in this) action(element)
+    }
+
+/**
+ * @suppress: **Deprecated**: binary compatibility with old code
+ */
+@Deprecated("binary compatibility with old code", level = DeprecationLevel.HIDDEN)
+public suspend fun <E> BroadcastChannel<E>.consumeEach(action: suspend (E) -> Unit) =
+    consumeEach { action(it) }
+
+// -------- Operations on ReceiveChannel --------
+
+/**
+ * Returns a [CompletionHandler] that invokes [cancel][ReceiveChannel.cancel] on the [ReceiveChannel]
+ * with the corresponding cause. See also [ReceiveChannel.consume].
+ *
+ * **WARNING**: It is planned that in the future a second invocation of this method
+ * on an channel that is already being consumed is going to fail fast, that is
+ * immediately throw an [IllegalStateException].
+ * See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/167)
+ * for details.
+ */
+public fun ReceiveChannel<*>.consumes(): CompletionHandler =
+    { cause: Throwable? -> cancel(cause) }
+
+/**
+ * Returns a [CompletionHandler] that invokes [cancel][ReceiveChannel.cancel] on all the
+ * specified [ReceiveChannel] instances with the corresponding cause.
+ * See also [ReceiveChannel.consumes()] for a version on one channel.
+ */
+public fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler =
+    { cause: Throwable? ->
+        var exception: Throwable? = null
+        for (channel in channels)
+            try {
+                channel.cancel(cause)
+            } catch (e: Throwable) {
+                if (exception == null) {
+                    exception = e
+                } else {
+                    exception.addSuppressedThrowable(e)
+                }
+            }
+        exception?.let { throw it }
+    }
+
+/**
+ * Makes sure that the given [block] consumes all elements from the given channel
+ * by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
+ *
+ * **WARNING**: It is planned that in the future a second invocation of this method
+ * on an channel that is already being consumed is going to fail fast, that is
+ * immediately throw an [IllegalStateException].
+ * See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/167)
+ * for details.
+ *
+ * The operation is _terminal_.
+ */
+public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
+    var cause: Throwable? = null
+    try {
+        return block()
+    } catch (e: Throwable) {
+        cause = e
+        throw e
+    } finally {
+        cancel(cause)
+    }
+}
+
+/**
+ * Performs the given [action] for each received element.
+ *
+ * **WARNING**: It is planned that in the future a second invocation of this method
+ * on an channel that is already being consumed is going to fail fast, that is
+ * immediately throw an [IllegalStateException].
+ * See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/167)
+ * for details.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit) =
+    consume {
+        for (e in this) action(e)
+    }
+
+/**
+ * @suppress: **Deprecated**: binary compatibility with old code
+ */
+@Deprecated("binary compatibility with old code", level = DeprecationLevel.HIDDEN)
+public suspend fun <E> ReceiveChannel<E>.consumeEach(action: suspend (E) -> Unit) =
+    consumeEach { action(it) }
+
+/**
+ * Performs the given [action] for each received element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.consumeEachIndexed(action: (IndexedValue<E>) -> Unit) {
+    var index = 0
+    consumeEach {
+        action(IndexedValue(index++, it))
+    }
+}
+
+/**
+ * Returns an element at the given [index] or throws an [IndexOutOfBoundsException] if the [index] is out of bounds of this channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.elementAt(index: Int): E =
+    elementAtOrElse(index) { throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.") }
+
+/**
+ * Returns an element at the given [index] or the result of calling the [defaultValue] function if the [index] is out of bounds of this channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.elementAtOrElse(index: Int, defaultValue: (Int) -> E): E =
+    consume {
+        if (index < 0)
+            return defaultValue(index)
+        var count = 0
+        for (element in this) {
+            if (index == count++)
+                return element
+        }
+        return defaultValue(index)
+    }
+
+/**
+ * Returns an element at the given [index] or `null` if the [index] is out of bounds of this channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.elementAtOrNull(index: Int): E? =
+    consume {
+        if (index < 0)
+            return null
+        var count = 0
+        for (element in this) {
+            if (index == count++)
+                return element
+        }
+        return null
+    }
+
+/**
+ * Returns the first element matching the given [predicate], or `null` if no such element was found.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.find(predicate: (E) -> Boolean): E? =
+    firstOrNull(predicate)
+
+/**
+ * Returns the last element matching the given [predicate], or `null` if no such element was found.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.findLast(predicate: (E) -> Boolean): E? =
+    lastOrNull(predicate)
+
+/**
+ * Returns first element.
+ * @throws [NoSuchElementException] if the channel is empty.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.first(): E =
+    consume {
+        val iterator = iterator()
+        if (!iterator.hasNext())
+            throw NoSuchElementException("ReceiveChannel is empty.")
+        return iterator.next()
+    }
+
+/**
+ * Returns the first element matching the given [predicate].
+ * @throws [NoSuchElementException] if no such element is found.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.first(predicate: (E) -> Boolean): E {
+    consumeEach {
+        if (predicate(it)) return it
+    }
+    throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.")
+}
+
+/**
+ * Returns the first element, or `null` if the channel is empty.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.firstOrNull(): E? =
+    consume {
+        val iterator = iterator()
+        if (!iterator.hasNext())
+            return null
+        return iterator.next()
+    }
+
+/**
+ * Returns the first element matching the given [predicate], or `null` if element was not found.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E> ReceiveChannel<E>.firstOrNull(predicate: (E) -> Boolean): E? {
+    consumeEach {
+        if (predicate(it)) return it
+    }
+    return null
+}
+
+/**
+ * Returns first index of [element], or -1 if the channel does not contain element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.indexOf(element: E): Int {
+    var index = 0
+    consumeEach {
+        if (element == it)
+            return index
+        index++
+    }
+    return -1
+}
+
+/**
+ * Returns index of the first element matching the given [predicate], or -1 if the channel does not contain such element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.indexOfFirst(predicate: (E) -> Boolean): Int {
+    var index = 0
+    consumeEach {
+        if (predicate(it))
+            return index
+        index++
+    }
+    return -1
+}
+
+/**
+ * Returns index of the last element matching the given [predicate], or -1 if the channel does not contain such element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E> ReceiveChannel<E>.indexOfLast(predicate: (E) -> Boolean): Int {
+    var lastIndex = -1
+    var index = 0
+    consumeEach {
+        if (predicate(it))
+            lastIndex = index
+        index++
+    }
+    return lastIndex
+}
+
+/**
+ * Returns the last element.
+ * @throws [NoSuchElementException] if the channel is empty.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.last(): E =
+    consume {
+        val iterator = iterator()
+        if (!iterator.hasNext())
+            throw NoSuchElementException("ReceiveChannel is empty.")
+        var last = iterator.next()
+        while (iterator.hasNext())
+            last = iterator.next()
+        return last
+    }
+
+/**
+ * Returns the last element matching the given [predicate].
+ * @throws [NoSuchElementException] if no such element is found.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.last(predicate: (E) -> Boolean): E {
+    var last: E? = null
+    var found = false
+    consumeEach {
+        if (predicate(it)) {
+            last = it
+            found = true
+        }
+    }
+    if (!found) throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.")
+    @Suppress("UNCHECKED_CAST")
+    return last as E
+}
+
+/**
+ * Returns last index of [element], or -1 if the channel does not contain element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.lastIndexOf(element: E): Int {
+    var lastIndex = -1
+    var index = 0
+    consumeEach {
+        if (element == it)
+            lastIndex = index
+        index++
+    }
+    return lastIndex
+}
+
+/**
+ * Returns the last element, or `null` if the channel is empty.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.lastOrNull(): E? =
+    consume {
+        val iterator = iterator()
+        if (!iterator.hasNext())
+            return null
+        var last = iterator.next()
+        while (iterator.hasNext())
+            last = iterator.next()
+        return last
+    }
+
+/**
+ * Returns the last element matching the given [predicate], or `null` if no such element was found.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.lastOrNull(predicate: (E) -> Boolean): E? {
+    var last: E? = null
+    consumeEach {
+        if (predicate(it)) {
+            last = it
+        }
+    }
+    return last
+}
+
+/**
+ * Returns the single element, or throws an exception if the channel is empty or has more than one element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.single(): E =
+    consume {
+        val iterator = iterator()
+        if (!iterator.hasNext())
+            throw NoSuchElementException("ReceiveChannel is empty.")
+        val single = iterator.next()
+        if (iterator.hasNext())
+            throw IllegalArgumentException("ReceiveChannel has more than one element.")
+        return single
+    }
+
+/**
+ * Returns the single element matching the given [predicate], or throws exception if there is no or more than one matching element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.single(predicate: (E) -> Boolean): E {
+    var single: E? = null
+    var found = false
+    consumeEach {
+        if (predicate(it)) {
+            if (found) throw IllegalArgumentException("ReceiveChannel contains more than one matching element.")
+            single = it
+            found = true
+        }
+    }
+    if (!found) throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.")
+    @Suppress("UNCHECKED_CAST")
+    return single as E
+}
+
+/**
+ * Returns single element, or `null` if the channel is empty or has more than one element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.singleOrNull(): E? =
+    consume {
+        val iterator = iterator()
+        if (!iterator.hasNext())
+            return null
+        val single = iterator.next()
+        if (iterator.hasNext())
+            return null
+        return single
+    }
+
+/**
+ * Returns the single element matching the given [predicate], or `null` if element was not found or more than one element was found.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.singleOrNull(predicate: (E) -> Boolean): E? {
+    var single: E? = null
+    var found = false
+    consumeEach {
+        if (predicate(it)) {
+            if (found) return null
+            single = it
+            found = true
+        }
+    }
+    if (!found) return null
+    return single
+}
+
+/**
+ * Returns a channel containing all elements except first [n] elements.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public fun <E> ReceiveChannel<E>.drop(n: Int, context: CoroutineContext = Unconfined): ReceiveChannel<E> =
+    produce(context, onCompletion = consumes()) {
+        require(n >= 0) { "Requested element count $n is less than zero." }
+        var remaining: Int = n
+        if (remaining > 0)
+            for (e in this@drop) {
+                remaining--
+                if (remaining == 0)
+                    break
+            }
+        for (e in this@drop) {
+            send(e)
+        }
+    }
+
+/**
+ * Returns a channel containing all elements except first elements that satisfy the given [predicate].
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E> ReceiveChannel<E>.dropWhile(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
+    produce(context, onCompletion = consumes()) {
+        for (e in this@dropWhile) {
+            if (!predicate(e)) {
+                send(e)
+                break
+            }
+        }
+        for (e in this@dropWhile) {
+            send(e)
+        }
+    }
+
+/**
+ * Returns a channel containing only elements matching the given [predicate].
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E> ReceiveChannel<E>.filter(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
+    produce(context, onCompletion = consumes()) {
+        for (e in this@filter) {
+            if (predicate(e)) send(e)
+        }
+    }
+
+/**
+ * Returns a channel containing only elements matching the given [predicate].
+ * @param [predicate] function that takes the index of an element and the element itself
+ * and returns the result of predicate evaluation on the element.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E> ReceiveChannel<E>.filterIndexed(context: CoroutineContext = Unconfined, predicate: suspend (index: Int, E) -> Boolean): ReceiveChannel<E> =
+    produce(context, onCompletion = consumes()) {
+        var index = 0
+        for (e in this@filterIndexed) {
+            if (predicate(index++, e)) send(e)
+        }
+    }
+
+/**
+ * Appends all elements matching the given [predicate] to the given [destination].
+ * @param [predicate] function that takes the index of an element and the element itself
+ * and returns the result of predicate evaluation on the element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterIndexedTo(destination: C, predicate: (index: Int, E) -> Boolean): C {
+    consumeEachIndexed { (index, element) ->
+        if (predicate(index, element)) destination.add(element)
+    }
+    return destination
+}
+
+/**
+ * Appends all elements matching the given [predicate] to the given [destination].
+ * @param [predicate] function that takes the index of an element and the element itself
+ * and returns the result of predicate evaluation on the element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterIndexedTo(destination: C, predicate: (index: Int, E) -> Boolean): C {
+    consumeEachIndexed { (index, element) ->
+        if (predicate(index, element)) destination.send(element)
+    }
+    return destination
+}
+
+/**
+ * Returns a channel containing all elements not matching the given [predicate].
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E> ReceiveChannel<E>.filterNot(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
+    filter(context) { !predicate(it) }
+
+/**
+ * @suppress **Deprecated**: For binary compatibility only
+ */
+@Deprecated("For binary compatibility only", level = DeprecationLevel.HIDDEN)
+public fun <E> ReceiveChannel<E>.filterNot(predicate: suspend (E) -> Boolean): ReceiveChannel<E> = filterNot(predicate = predicate)
+
+/**
+ * Returns a channel containing all elements that are not `null`.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+@Suppress("UNCHECKED_CAST")
+public fun <E : Any> ReceiveChannel<E?>.filterNotNull(): ReceiveChannel<E> =
+    filter { it != null } as ReceiveChannel<E>
+
+/**
+ * Appends all elements that are not `null` to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E : Any, C : MutableCollection<in E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
+    consumeEach {
+        if (it != null) destination.add(it)
+    }
+    return destination
+}
+
+/**
+ * Appends all elements that are not `null` to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E : Any, C : SendChannel<E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
+    consumeEach {
+        if (it != null) destination.send(it)
+    }
+    return destination
+}
+
+/**
+ * Appends all elements not matching the given [predicate] to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterNotTo(destination: C, predicate: (E) -> Boolean): C {
+    consumeEach {
+        if (!predicate(it)) destination.add(it)
+    }
+    return destination
+}
+
+/**
+ * Appends all elements not matching the given [predicate] to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterNotTo(destination: C, predicate: (E) -> Boolean): C {
+    consumeEach {
+        if (!predicate(it)) destination.send(it)
+    }
+    return destination
+}
+
+/**
+ * Appends all elements matching the given [predicate] to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterTo(destination: C, predicate: (E) -> Boolean): C {
+    consumeEach {
+        if (predicate(it)) destination.add(it)
+    }
+    return destination
+}
+
+/**
+ * Appends all elements matching the given [predicate] to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterTo(destination: C, predicate: (E) -> Boolean): C {
+    consumeEach {
+        if (predicate(it)) destination.send(it)
+    }
+    return destination
+}
+
+/**
+ * Returns a channel containing first [n] elements.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public fun <E> ReceiveChannel<E>.take(n: Int, context: CoroutineContext = Unconfined): ReceiveChannel<E> =
+    produce(context, onCompletion = consumes()) {
+        if (n == 0) return@produce
+        require(n >= 0) { "Requested element count $n is less than zero." }
+        var remaining: Int = n
+        for (e in this@take) {
+            send(e)
+            remaining--
+            if (remaining == 0)
+                return@produce
+        }
+    }
+
+/**
+ * Returns a channel containing first elements satisfying the given [predicate].
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E> ReceiveChannel<E>.takeWhile(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
+    produce(context, onCompletion = consumes()) {
+        for (e in this@takeWhile) {
+            if (!predicate(e)) return@produce
+            send(e)
+        }
+    }
+
+/**
+ * Returns a [Map] containing key-value pairs provided by [transform] function
+ * applied to elements of the given channel.
+ *
+ * If any of two pairs would have the same key the last one gets added to the map.
+ *
+ * The returned map preserves the entry iteration order of the original channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, K, V> ReceiveChannel<E>.associate(transform: (E) -> Pair<K, V>): Map<K, V> =
+    associateTo(LinkedHashMap(), transform)
+
+/**
+ * Returns a [Map] containing the elements from the given channel indexed by the key
+ * returned from [keySelector] function applied to each element.
+ *
+ * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
+ *
+ * The returned map preserves the entry iteration order of the original channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, K> ReceiveChannel<E>.associateBy(keySelector: (E) -> K): Map<K, E> =
+    associateByTo(LinkedHashMap(), keySelector)
+
+/**
+ * Returns a [Map] containing the values provided by [valueTransform] and indexed by [keySelector] functions applied to elements of the given channel.
+ *
+ * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
+ *
+ * The returned map preserves the entry iteration order of the original channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, K, V> ReceiveChannel<E>.associateBy(keySelector: (E) -> K, valueTransform: (E) -> V): Map<K, V> =
+    associateByTo(LinkedHashMap(), keySelector, valueTransform)
+
+/**
+ * Populates and returns the [destination] mutable map with key-value pairs,
+ * where key is provided by the [keySelector] function applied to each element of the given channel
+ * and value is the element itself.
+ *
+ * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, K, M : MutableMap<in K, in E>> ReceiveChannel<E>.associateByTo(destination: M, keySelector: (E) -> K): M {
+    consumeEach {
+        destination.put(keySelector(it), it)
+    }
+    return destination
+}
+
+/**
+ * Populates and returns the [destination] mutable map with key-value pairs,
+ * where key is provided by the [keySelector] function and
+ * and value is provided by the [valueTransform] function applied to elements of the given channel.
+ *
+ * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateByTo(destination: M, keySelector: (E) -> K, valueTransform: (E) -> V): M {
+    consumeEach {
+        destination.put(keySelector(it), valueTransform(it))
+    }
+    return destination
+}
+
+/**
+ * Populates and returns the [destination] mutable map with key-value pairs
+ * provided by [transform] function applied to each element of the given channel.
+ *
+ * If any of two pairs would have the same key the last one gets added to the map.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateTo(destination: M, transform: (E) -> Pair<K, V>): M {
+    consumeEach {
+        destination += transform(it)
+    }
+    return destination
+}
+
+/**
+ * Send each element of the original channel
+ * and appends the results to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.toChannel(destination: C): C {
+    consumeEach {
+        destination.send(it)
+    }
+    return destination
+}
+
+/**
+ * Appends all elements to the given [destination] collection.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.toCollection(destination: C): C {
+    consumeEach {
+        destination.add(it)
+    }
+    return destination
+}
+
+/**
+ * Returns a [List] containing all elements.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.toList(): List<E> =
+    this.toMutableList()
+
+/**
+ * Returns a [Map] filled with all elements of this channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <K, V> ReceiveChannel<Pair<K, V>>.toMap(): Map<K, V> =
+    toMap(LinkedHashMap())
+
+/**
+ * Returns a [MutableMap] filled with all elements of this channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <K, V, M : MutableMap<in K, in V>> ReceiveChannel<Pair<K, V>>.toMap(destination: M): M {
+    consumeEach {
+        destination += it
+    }
+    return destination
+}
+
+/**
+ * Returns a [MutableList] filled with all elements of this channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.toMutableList(): MutableList<E> =
+    toCollection(ArrayList())
+
+/**
+ * Returns a [Set] of all elements.
+ *
+ * The returned set preserves the element iteration order of the original channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.toSet(): Set<E> =
+    this.toMutableSet()
+
+/**
+ * Returns a single channel of all elements from results of [transform] function being invoked on each element of original channel.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E, R> ReceiveChannel<E>.flatMap(context: CoroutineContext = Unconfined, transform: suspend (E) -> ReceiveChannel<R>): ReceiveChannel<R> =
+    produce(context, onCompletion = consumes()) {
+        for (e in this@flatMap) {
+            transform(e).toChannel(this)
+        }
+    }
+
+/**
+ * Groups elements of the original channel by the key returned by the given [keySelector] function
+ * applied to each element and returns a map where each group key is associated with a list of corresponding elements.
+ *
+ * The returned map preserves the entry iteration order of the keys produced from the original channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, K> ReceiveChannel<E>.groupBy(keySelector: (E) -> K): Map<K, List<E>> =
+    groupByTo(LinkedHashMap(), keySelector)
+
+/**
+ * Groups values returned by the [valueTransform] function applied to each element of the original channel
+ * by the key returned by the given [keySelector] function applied to the element
+ * and returns a map where each group key is associated with a list of corresponding values.
+ *
+ * The returned map preserves the entry iteration order of the keys produced from the original channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, K, V> ReceiveChannel<E>.groupBy(keySelector: (E) -> K, valueTransform: (E) -> V): Map<K, List<V>> =
+    groupByTo(LinkedHashMap(), keySelector, valueTransform)
+
+/**
+ * Groups elements of the original channel by the key returned by the given [keySelector] function
+ * applied to each element and puts to the [destination] map each group key associated with a list of corresponding elements.
+ *
+ * @return The [destination] map.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, K, M : MutableMap<in K, MutableList<E>>> ReceiveChannel<E>.groupByTo(destination: M, keySelector: (E) -> K): M {
+    consumeEach {
+        val key = keySelector(it)
+        val list = destination.getOrPut(key) { ArrayList() }
+        list.add(it)
+    }
+    return destination
+}
+
+/**
+ * Groups values returned by the [valueTransform] function applied to each element of the original channel
+ * by the key returned by the given [keySelector] function applied to the element
+ * and puts to the [destination] map each group key associated with a list of corresponding values.
+ *
+ * @return The [destination] map.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, K, V, M : MutableMap<in K, MutableList<V>>> ReceiveChannel<E>.groupByTo(destination: M, keySelector: (E) -> K, valueTransform: (E) -> V): M {
+    consumeEach {
+        val key = keySelector(it)
+        val list = destination.getOrPut(key) { ArrayList() }
+        list.add(valueTransform(it))
+    }
+    return destination
+}
+
+/**
+ * Returns a channel containing the results of applying the given [transform] function
+ * to each element in the original channel.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark transform with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, transform: suspend (E) -> R): ReceiveChannel<R> =
+    produce(context, onCompletion = consumes()) {
+        consumeEach {
+            send(transform(it))
+        }
+    }
+
+/**
+ * Returns a channel containing the results of applying the given [transform] function
+ * to each element and its index in the original channel.
+ * @param [transform] function that takes the index of an element and the element itself
+ * and returns the result of the transform applied to the element.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E, R> ReceiveChannel<E>.mapIndexed(context: CoroutineContext = Unconfined, transform: suspend (index: Int, E) -> R): ReceiveChannel<R> =
+    produce(context, onCompletion = consumes()) {
+        var index = 0
+        for (e in this@mapIndexed) {
+            send(transform(index++, e))
+        }
+    }
+
+/**
+ * Returns a channel containing only the non-null results of applying the given [transform] function
+ * to each element and its index in the original channel.
+ * @param [transform] function that takes the index of an element and the element itself
+ * and returns the result of the transform applied to the element.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E, R : Any> ReceiveChannel<E>.mapIndexedNotNull(context: CoroutineContext = Unconfined, transform: suspend (index: Int, E) -> R?): ReceiveChannel<R> =
+    mapIndexed(context, transform).filterNotNull()
+
+/**
+ * Applies the given [transform] function to each element and its index in the original channel
+ * and appends only the non-null results to the given [destination].
+ * @param [transform] function that takes the index of an element and the element itself
+ * and returns the result of the transform applied to the element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedNotNullTo(destination: C, transform: (index: Int, E) -> R?): C {
+    consumeEachIndexed { (index, element) ->
+        transform(index, element)?.let { destination.add(it) }
+    }
+    return destination
+}
+
+/**
+ * Applies the given [transform] function to each element and its index in the original channel
+ * and appends only the non-null results to the given [destination].
+ * @param [transform] function that takes the index of an element and the element itself
+ * and returns the result of the transform applied to the element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, R : Any, C : SendChannel<R>> ReceiveChannel<E>.mapIndexedNotNullTo(destination: C, transform: (index: Int, E) -> R?): C {
+    consumeEachIndexed { (index, element) ->
+        transform(index, element)?.let { destination.send(it) }
+    }
+    return destination
+}
+
+/**
+ * Applies the given [transform] function to each element and its index in the original channel
+ * and appends the results to the given [destination].
+ * @param [transform] function that takes the index of an element and the element itself
+ * and returns the result of the transform applied to the element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedTo(destination: C, transform: (index: Int, E) -> R): C {
+    var index = 0
+    consumeEach {
+        destination.add(transform(index++, it))
+    }
+    return destination
+}
+
+/**
+ * Applies the given [transform] function to each element and its index in the original channel
+ * and appends the results to the given [destination].
+ * @param [transform] function that takes the index of an element and the element itself
+ * and returns the result of the transform applied to the element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, R, C : SendChannel<R>> ReceiveChannel<E>.mapIndexedTo(destination: C, transform: (index: Int, E) -> R): C {
+    var index = 0
+    consumeEach {
+        destination.send(transform(index++, it))
+    }
+    return destination
+}
+
+/**
+ * Returns a channel containing only the non-null results of applying the given [transform] function
+ * to each element in the original channel.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E, R : Any> ReceiveChannel<E>.mapNotNull(context: CoroutineContext = Unconfined, transform: suspend (E) -> R?): ReceiveChannel<R> =
+    map(context, transform).filterNotNull()
+
+/**
+ * Applies the given [transform] function to each element in the original channel
+ * and appends only the non-null results to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapNotNullTo(destination: C, transform: (E) -> R?): C {
+    consumeEach {
+        transform(it)?.let { destination.add(it) }
+    }
+    return destination
+}
+
+/**
+ * Applies the given [transform] function to each element in the original channel
+ * and appends only the non-null results to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, R : Any, C : SendChannel<R>> ReceiveChannel<E>.mapNotNullTo(destination: C, transform: (E) -> R?): C {
+    consumeEach {
+        transform(it)?.let { destination.send(it) }
+    }
+    return destination
+}
+
+/**
+ * Applies the given [transform] function to each element of the original channel
+ * and appends the results to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapTo(destination: C, transform: (E) -> R): C {
+    consumeEach {
+        destination.add(transform(it))
+    }
+    return destination
+}
+
+/**
+ * Applies the given [transform] function to each element of the original channel
+ * and appends the results to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, R, C : SendChannel<R>> ReceiveChannel<E>.mapTo(destination: C, transform: (E) -> R): C {
+    consumeEach {
+        destination.send(transform(it))
+    }
+    return destination
+}
+
+/**
+ * Returns a channel of [IndexedValue] for each element of the original channel.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public fun <E> ReceiveChannel<E>.withIndex(context: CoroutineContext = Unconfined): ReceiveChannel<IndexedValue<E>> =
+    produce(context, onCompletion = consumes()) {
+        var index = 0
+        for (e in this@withIndex) {
+            send(IndexedValue(index++, e))
+        }
+    }
+
+/**
+ * Returns a channel containing only distinct elements from the given channel.
+ *
+ * The elements in the resulting channel are in the same order as they were in the source channel.
+ *
+ * The operation is _intermediate_ and _stateful_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public fun <E> ReceiveChannel<E>.distinct(): ReceiveChannel<E> =
+    this.distinctBy { it }
+
+/**
+ * Returns a channel containing only elements from the given channel
+ * having distinct keys returned by the given [selector] function.
+ *
+ * The elements in the resulting channel are in the same order as they were in the source channel.
+ *
+ * The operation is _intermediate_ and _stateful_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E, K> ReceiveChannel<E>.distinctBy(context: CoroutineContext = Unconfined, selector: suspend (E) -> K): ReceiveChannel<E> =
+    produce(context, onCompletion = consumes()) {
+        val keys = HashSet<K>()
+        for (e in this@distinctBy) {
+            val k = selector(e)
+            if (k !in keys) {
+                send(e)
+                keys += k
+            }
+        }
+    }
+
+/**
+ * Returns a mutable set containing all distinct elements from the given channel.
+ *
+ * The returned set preserves the element iteration order of the original channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.toMutableSet(): MutableSet<E> =
+    toCollection(LinkedHashSet())
+
+/**
+ * Returns `true` if all elements match the given [predicate].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.all(predicate: (E) -> Boolean): Boolean {
+    consumeEach {
+        if (!predicate(it)) return false
+    }
+    return true
+}
+
+/**
+ * Returns `true` if channel has at least one element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.any(): Boolean =
+    consume {
+        return iterator().hasNext()
+    }
+
+/**
+ * Returns `true` if at least one element matches the given [predicate].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.any(predicate: (E) -> Boolean): Boolean {
+    consumeEach {
+        if (predicate(it)) return true
+    }
+    return false
+}
+
+/**
+ * Returns the number of elements in this channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.count(): Int {
+    var count = 0
+    consumeEach { count++ }
+    return count
+}
+
+/**
+ * Returns the number of elements matching the given [predicate].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.count(predicate: (E) -> Boolean): Int {
+    var count = 0
+    consumeEach {
+        if (predicate(it)) count++
+    }
+    return count
+}
+
+/**
+ * Accumulates value starting with [initial] value and applying [operation] from left to right to current accumulator value and each element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, R> ReceiveChannel<E>.fold(initial: R, operation: (acc: R, E) -> R): R {
+    var accumulator = initial
+    consumeEach {
+        accumulator = operation(accumulator, it)
+    }
+    return accumulator
+}
+
+/**
+ * Accumulates value starting with [initial] value and applying [operation] from left to right
+ * to current accumulator value and each element with its index in the original channel.
+ * @param [operation] function that takes the index of an element, current accumulator value
+ * and the element itself, and calculates the next accumulator value.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, R> ReceiveChannel<E>.foldIndexed(initial: R, operation: (index: Int, acc: R, E) -> R): R {
+    var index = 0
+    var accumulator = initial
+    consumeEach {
+        accumulator = operation(index++, accumulator, it)
+    }
+    return accumulator
+}
+
+/**
+ * Returns the first element yielding the largest value of the given function or `null` if there are no elements.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, R : Comparable<R>> ReceiveChannel<E>.maxBy(selector: (E) -> R): E? =
+    consume {
+        val iterator = iterator()
+        if (!iterator.hasNext()) return null
+        var maxElem = iterator.next()
+        var maxValue = selector(maxElem)
+        while (iterator.hasNext()) {
+            val e = iterator.next()
+            val v = selector(e)
+            if (maxValue < v) {
+                maxElem = e
+                maxValue = v
+            }
+        }
+        return maxElem
+    }
+
+/**
+ * Returns the first element having the largest value according to the provided [comparator] or `null` if there are no elements.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.maxWith(comparator: Comparator<in E>): E? =
+    consume {
+        val iterator = iterator()
+        if (!iterator.hasNext()) return null
+        var max = iterator.next()
+        while (iterator.hasNext()) {
+            val e = iterator.next()
+            if (comparator.compare(max, e) < 0) max = e
+        }
+        return max
+    }
+
+/**
+ * Returns the first element yielding the smallest value of the given function or `null` if there are no elements.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, R : Comparable<R>> ReceiveChannel<E>.minBy(selector: (E) -> R): E? =
+    consume {
+        val iterator = iterator()
+        if (!iterator.hasNext()) return null
+        var minElem = iterator.next()
+        var minValue = selector(minElem)
+        while (iterator.hasNext()) {
+            val e = iterator.next()
+            val v = selector(e)
+            if (minValue > v) {
+                minElem = e
+                minValue = v
+            }
+        }
+        return minElem
+    }
+
+/**
+ * Returns the first element having the smallest value according to the provided [comparator] or `null` if there are no elements.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.minWith(comparator: Comparator<in E>): E? =
+    consume {
+        val iterator = iterator()
+        if (!iterator.hasNext()) return null
+        var min = iterator.next()
+        while (iterator.hasNext()) {
+            val e = iterator.next()
+            if (comparator.compare(min, e) > 0) min = e
+        }
+        return min
+    }
+
+/**
+ * Returns `true` if the channel has no elements.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.none(): Boolean =
+    consume {
+        return !iterator().hasNext()
+    }
+
+/**
+ * Returns `true` if no elements match the given [predicate].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.none(predicate: (E) -> Boolean): Boolean {
+    consumeEach {
+        if (predicate(it)) return false
+    }
+    return true
+}
+
+/**
+ * Accumulates value starting with the first element and applying [operation] from left to right to current accumulator value and each element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <S, E : S> ReceiveChannel<E>.reduce(operation: (acc: S, E) -> S): S =
+    consume {
+        val iterator = this.iterator()
+        if (!iterator.hasNext()) throw UnsupportedOperationException("Empty channel can't be reduced.")
+        var accumulator: S = iterator.next()
+        while (iterator.hasNext()) {
+            accumulator = operation(accumulator, iterator.next())
+        }
+        return accumulator
+    }
+
+/**
+ * Accumulates value starting with the first element and applying [operation] from left to right
+ * to current accumulator value and each element with its index in the original channel.
+ * @param [operation] function that takes the index of an element, current accumulator value
+ * and the element itself and calculates the next accumulator value.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark operation with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public suspend inline fun <S, E : S> ReceiveChannel<E>.reduceIndexed(operation: (index: Int, acc: S, E) -> S): S =
+    consume {
+        val iterator = this.iterator()
+        if (!iterator.hasNext()) throw UnsupportedOperationException("Empty channel can't be reduced.")
+        var index = 1
+        var accumulator: S = iterator.next()
+        while (iterator.hasNext()) {
+            accumulator = operation(index++, accumulator, iterator.next())
+        }
+        return accumulator
+    }
+
+/**
+ * Returns the sum of all values produced by [selector] function applied to each element in the channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.sumBy(selector: (E) -> Int): Int {
+    var sum = 0
+    consumeEach {
+        sum += selector(it)
+    }
+    return sum
+}
+
+/**
+ * Returns the sum of all values produced by [selector] function applied to each element in the channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.sumByDouble(selector: (E) -> Double): Double {
+    var sum = 0.0
+    consumeEach {
+        sum += selector(it)
+    }
+    return sum
+}
+
+/**
+ * Returns an original collection containing all the non-`null` elements, throwing an [IllegalArgumentException] if there are any `null` elements.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public fun <E : Any> ReceiveChannel<E?>.requireNoNulls(): ReceiveChannel<E> =
+    map { it ?: throw IllegalArgumentException("null element found in $this.") }
+
+/**
+ * Splits the original channel into pair of lists,
+ * where *first* list contains elements for which [predicate] yielded `true`,
+ * while *second* list contains elements for which [predicate] yielded `false`.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.partition(predicate: (E) -> Boolean): Pair<List<E>, List<E>> {
+    val first = ArrayList<E>()
+    val second = ArrayList<E>()
+    consumeEach {
+        if (predicate(it)) {
+            first.add(it)
+        } else {
+            second.add(it)
+        }
+    }
+    return Pair(first, second)
+}
+
+/**
+ * Returns a channel of pairs built from elements of both channels with same indexes.
+ * Resulting channel has length of shortest input channel.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of both the original [ReceiveChannel] and the `other` one.
+ */
+public infix fun <E, R> ReceiveChannel<E>.zip(other: ReceiveChannel<R>): ReceiveChannel<Pair<E, R>> =
+    zip(other) { t1, t2 -> t1 to t2 }
+
+/**
+ * Returns a channel of values built from elements of both collections with same indexes using provided [transform]. Resulting channel has length of shortest input channels.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][consume] all elements of both the original [ReceiveChannel] and the `other` one.
+ */
+// todo: mark transform with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E, R, V> ReceiveChannel<E>.zip(other: ReceiveChannel<R>, context: CoroutineContext = Unconfined, transform: (a: E, b: R) -> V): ReceiveChannel<V> =
+    produce(context, onCompletion = consumesAll(this, other)) {
+        val otherIterator = other.iterator()
+        this@zip.consumeEach { element1 ->
+            if (!otherIterator.hasNext()) return@consumeEach
+            val element2 = otherIterator.next()
+            send(transform(element1, element2))
+        }
+    }
+
+
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt
new file mode 100644
index 0000000..a8ec1ff
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt
@@ -0,0 +1,258 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.atomicfu.*
+import kotlinx.coroutines.experimental.internal.*
+import kotlinx.coroutines.experimental.internalAnnotations.*
+import kotlinx.coroutines.experimental.intrinsics.*
+import kotlinx.coroutines.experimental.selects.*
+
+/**
+ * Broadcasts the most recently sent element (aka [value]) to all [openSubscription] subscribers.
+ *
+ * Back-to-send sent elements are _conflated_ -- only the the most recently sent value is received,
+ * while previously sent elements **are lost**.
+ * Every subscriber immediately receives the most recently sent element.
+ * Sender to this broadcast channel never suspends and [offer] always returns `true`.
+ *
+ * A secondary constructor can be used to create an instance of this class that already holds a value.
+ * This channel is also created by `BroadcastChannel(Channel.CONFLATED)` factory function invocation.
+ *
+ * This implementation is fully lock-free. In this implementation
+ * [opening][openSubscription] and [closing][SubscriptionReceiveChannel.close] subscription takes O(N) time, where N is the
+ * number of subscribers.
+ */
+public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
+    /**
+     * Creates an instance of this class that already holds a value.
+     *
+     * It is as a shortcut to creating an instance with a default constructor and
+     * immediately sending an element: `ConflatedBroadcastChannel().apply { offer(value) }`.
+     */
+    constructor(value: E) : this() {
+        _state.lazySet(State<E>(value, null))
+    }
+
+    private val _state = atomic<Any>(INITIAL_STATE) // State | Closed
+    private val _updating = atomic(0)
+
+    private companion object {
+        @JvmField
+        val CLOSED = Closed(null)
+
+        @JvmField
+        val UNDEFINED = Symbol("UNDEFINED")
+
+        @JvmField
+        val INITIAL_STATE = State<Any?>(UNDEFINED, null)
+    }
+
+    private class State<E>(
+        @JvmField val value: Any?, // UNDEFINED | E
+        @JvmField val subscribers: Array<Subscriber<E>>?
+    )
+
+    private class Closed(@JvmField val closeCause: Throwable?) {
+        val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
+        val valueException: Throwable get() = closeCause ?: IllegalStateException(DEFAULT_CLOSE_MESSAGE)
+    }
+
+    /**
+     * The most recently sent element to this channel.
+     *
+     * Access to this property throws [IllegalStateException] when this class is constructed without
+     * initial value and no value was sent yet or if it was [closed][close] without a cause.
+     * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
+     */
+    @Suppress("UNCHECKED_CAST")
+    public val value: E get() {
+        _state.loop { state ->
+            when (state) {
+                is Closed -> throw state.valueException
+                is State<*> -> {
+                    if (state.value === UNDEFINED) throw IllegalStateException("No value")
+                    return state.value as E
+                }
+                else -> error("Invalid state $state")
+            }
+        }
+    }
+
+    /**
+     * The most recently sent element to this channel or `null` when this class is constructed without
+     * initial value and no value was sent yet or if it was [closed][close].
+     */
+    @Suppress("UNCHECKED_CAST")
+    public val valueOrNull: E? get() {
+        val state = _state.value
+        when (state) {
+            is Closed -> return null
+            is State<*> -> {
+                if (state.value === UNDEFINED) return null
+                return state.value as E
+            }
+            else -> error("Invalid state $state")
+        }
+    }
+
+    override val isClosedForSend: Boolean get() = _state.value is Closed
+    override val isFull: Boolean get() = false
+
+    @Suppress("UNCHECKED_CAST")
+    override fun openSubscription(): SubscriptionReceiveChannel<E> {
+        val subscriber = Subscriber<E>(this)
+        _state.loop { state ->
+            when (state) {
+                is Closed -> {
+                    subscriber.close(state.closeCause)
+                    return subscriber
+                }
+                is State<*> -> {
+                    if (state.value !== UNDEFINED)
+                        subscriber.offerInternal(state.value as E)
+                    val update = State(state.value, addSubscriber((state as State<E>).subscribers, subscriber))
+                    if (_state.compareAndSet(state, update))
+                        return subscriber
+                }
+                else -> error("Invalid state $state")
+            }
+        }
+    }
+
+    @Suppress("UNCHECKED_CAST")
+    private fun closeSubscriber(subscriber: Subscriber<E>) {
+        _state.loop { state ->
+            when (state) {
+                is Closed -> return
+                is State<*> -> {
+                    val update = State(state.value, removeSubscriber((state as State<E>).subscribers!!, subscriber))
+                    if (_state.compareAndSet(state, update))
+                        return
+                }
+                else -> error("Invalid state $state")
+            }
+        }
+    }
+
+    private fun addSubscriber(list: Array<Subscriber<E>>?, subscriber: Subscriber<E>): Array<Subscriber<E>> {
+        if (list == null) return Array<Subscriber<E>>(1) { subscriber }
+        return list + subscriber
+    }
+
+    @Suppress("UNCHECKED_CAST")
+    private fun removeSubscriber(list: Array<Subscriber<E>>, subscriber: Subscriber<E>): Array<Subscriber<E>>? {
+        val n = list.size
+        val i = list.indexOf(subscriber)
+        check(i >= 0)
+        if (n == 1) return null
+        val update = arrayOfNulls<Subscriber<E>>(n - 1)
+        arraycopy(list, 0, update, 0, i)
+        arraycopy(list, i + 1, update, i, n - i - 1)
+        return update as Array<Subscriber<E>>
+    }
+
+    @Suppress("UNCHECKED_CAST")
+    override fun close(cause: Throwable?): Boolean {
+        _state.loop { state ->
+            when (state) {
+                is Closed -> return false
+                is State<*> -> {
+                    val update = if (cause == null) CLOSED else Closed(cause)
+                    if (_state.compareAndSet(state, update)) {
+                        (state as State<E>).subscribers?.forEach { it.close(cause) }
+                        return true
+                    }
+                }
+                else -> error("Invalid state $state")
+            }
+        }
+    }
+
+    /**
+     * Sends the value to all subscribed receives and stores this value as the most recent state for
+     * future subscribers. This implementation never suspends.
+     * It throws exception if the channel [isClosedForSend] (see [close] for details).
+     */
+    suspend override fun send(element: E) {
+        offerInternal(element)?.let { throw it.sendException }
+    }
+
+    /**
+     * Sends the value to all subscribed receives and stores this value as the most recent state for
+     * future subscribers. This implementation always returns `true`.
+     * It throws exception if the channel [isClosedForSend] (see [close] for details).
+     */
+    override fun offer(element: E): Boolean {
+        offerInternal(element)?.let { throw it.sendException }
+        return true
+    }
+
+    @Suppress("UNCHECKED_CAST")
+    private fun offerInternal(element: E): Closed? {
+        // If some other thread is updating the state in its offer operation we assume that our offer had linearized
+        // before that offer (we lost) and that offer overwrote us and conflated our offer.
+        if (!_updating.compareAndSet(0, 1)) return null
+        try {
+            _state.loop { state ->
+                when (state) {
+                    is Closed -> return state
+                    is State<*> -> {
+                        val update = State(element, (state as State<E>).subscribers)
+                        if (_state.compareAndSet(state, update)) {
+                            // Note: Using offerInternal here to ignore the case when this subscriber was
+                            // already concurrently closed (assume the close had conflated our offer for this
+                            // particular subscriber).
+                            state.subscribers?.forEach { it.offerInternal(element) }
+                            return null
+                        }
+                    }
+                    else -> error("Invalid state $state")
+                }
+            }
+        } finally {
+            _updating.value = 0 // reset the updating flag to zero even when something goes wrong
+        }
+    }
+
+    override val onSend: SelectClause2<E, SendChannel<E>>
+        get() = object : SelectClause2<E, SendChannel<E>> {
+            override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
+                registerSelectSend(select, param, block)
+            }
+        }
+
+    private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
+        if (!select.trySelect(null)) return
+        offerInternal(element)?.let {
+            select.resumeSelectCancellableWithException(it.sendException)
+            return
+        }
+        block.startCoroutineUndispatched(receiver = this, completion = select.completion)
+    }
+
+    private class Subscriber<E>(
+        private val broadcastChannel: ConflatedBroadcastChannel<E>
+    ) : ConflatedChannel<E>(), SubscriptionReceiveChannel<E> {
+        override fun cancel(cause: Throwable?): Boolean =
+            close(cause).also { closed ->
+                if (closed) broadcastChannel.closeSubscriber(this)
+            }
+
+        public override fun offerInternal(element: E): Any = super.offerInternal(element)
+    }
+}
\ No newline at end of file
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannel.kt
new file mode 100644
index 0000000..ae98756
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannel.kt
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.selects.ALREADY_SELECTED
+import kotlinx.coroutines.experimental.selects.SelectInstance
+
+/**
+ * Channel that buffers at most one element and conflates all subsequent `send` and `offer` invocations,
+ * so that the receiver always gets the most recently sent element.
+ * Back-to-send sent elements are _conflated_ -- only the the most recently sent element is received,
+ * while previously sent elements **are lost**.
+ * Sender to this channel never suspends and [offer] always returns `true`.
+ *
+ * This channel is created by `Channel(Channel.CONFLATED)` factory function invocation.
+ *
+ * This implementation is fully lock-free.
+ */
+public open class ConflatedChannel<E> : AbstractChannel<E>() {
+    protected final override val isBufferAlwaysEmpty: Boolean get() = true
+    protected final override val isBufferEmpty: Boolean get() = true
+    protected final override val isBufferAlwaysFull: Boolean get() = false
+    protected final override val isBufferFull: Boolean get() = false
+
+    /**
+     * This implementation conflates last sent item when channel is closed.
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    override fun onClosed(closed: Closed<E>) {
+        conflatePreviousSendBuffered(closed)
+    }
+
+    // result is always `OFFER_SUCCESS | Closed`
+    protected override fun offerInternal(element: E): Any {
+        while (true) {
+            val result = super.offerInternal(element)
+            when {
+                result === OFFER_SUCCESS -> return OFFER_SUCCESS
+                result === OFFER_FAILED -> { // try to buffer
+                    val sendResult = sendConflated(element)
+                    when (sendResult) {
+                        null -> return OFFER_SUCCESS
+                        is Closed<*> -> return sendResult
+                    }
+                    // otherwise there was receiver in queue, retry super.offerInternal
+                }
+                result is Closed<*> -> return result
+                else -> error("Invalid offerInternal result $result")
+            }
+        }
+    }
+
+    // result is always `ALREADY_SELECTED | OFFER_SUCCESS | Closed`.
+    protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
+        while (true) {
+            val result = if (hasReceiveOrClosed)
+                super.offerSelectInternal(element, select) else
+                (select.performAtomicTrySelect(describeSendConflated(element)) ?: OFFER_SUCCESS)
+            when {
+                result === ALREADY_SELECTED -> return ALREADY_SELECTED
+                result === OFFER_SUCCESS -> return OFFER_SUCCESS
+                result === OFFER_FAILED -> {} // retry
+                result is Closed<*> -> return result
+                else -> error("Invalid result $result")
+            }
+        }
+    }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannel.kt
new file mode 100644
index 0000000..f2962ab
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannel.kt
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.selects.ALREADY_SELECTED
+import kotlinx.coroutines.experimental.selects.SelectInstance
+
+/**
+ * Channel with linked-list buffer of a unlimited capacity (limited only by available memory).
+ * Sender to this channel never suspends and [offer] always returns `true`.
+ *
+ * This channel is created by `Channel(Channel.UNLIMITED)` factory function invocation.
+ *
+ * This implementation is fully lock-free.
+ */
+public open class LinkedListChannel<E> : AbstractChannel<E>() {
+    protected final override val isBufferAlwaysEmpty: Boolean get() = true
+    protected final override val isBufferEmpty: Boolean get() = true
+    protected final override val isBufferAlwaysFull: Boolean get() = false
+    protected final override val isBufferFull: Boolean get() = false
+
+    // result is always `OFFER_SUCCESS | Closed`
+    protected override fun offerInternal(element: E): Any {
+        while (true) {
+            val result = super.offerInternal(element)
+            when {
+                result === OFFER_SUCCESS -> return OFFER_SUCCESS
+                result === OFFER_FAILED -> { // try to buffer
+                    val sendResult = sendBuffered(element)
+                    when (sendResult) {
+                        null -> return OFFER_SUCCESS
+                        is Closed<*> -> return sendResult
+                    }
+                    // otherwise there was receiver in queue, retry super.offerInternal
+                }
+                result is Closed<*> -> return result
+                else -> error("Invalid offerInternal result $result")
+            }
+        }
+    }
+
+    // result is always `ALREADY_SELECTED | OFFER_SUCCESS | Closed`.
+    protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
+        while (true) {
+            val result = if (hasReceiveOrClosed)
+                super.offerSelectInternal(element, select) else
+                (select.performAtomicTrySelect(describeSendBuffered(element)) ?: OFFER_SUCCESS)
+            when {
+                result === ALREADY_SELECTED -> return ALREADY_SELECTED
+                result === OFFER_SUCCESS -> return OFFER_SUCCESS
+                result === OFFER_FAILED -> {} // retry
+                result is Closed<*> -> return result
+                else -> error("Invalid result $result")
+            }
+        }
+    }
+}
+
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt
new file mode 100644
index 0000000..fc86b0a
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+
+/**
+ * Scope for [produce] coroutine builder.
+ */
+public interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
+    /**
+     * A reference to the channel that this coroutine [sends][send] elements to.
+     * It is provided for convenience, so that the code in the coroutine can refer
+     * to the channel as `channel` as apposed to `this`.
+     * All the [SendChannel] functions on this interface delegate to
+     * the channel instance returned by this function.
+     */
+    val channel: SendChannel<E>
+}
+
+/**
+ * @suppress **Deprecated**: Use `ReceiveChannel`.
+ */
+@Deprecated(message = "Use `ReceiveChannel`", replaceWith = ReplaceWith("ReceiveChannel"))
+@Suppress("MULTIPLE_DEFAULTS_INHERITED_FROM_SUPERTYPES_WHEN_NO_EXPLICIT_OVERRIDE")
+interface ProducerJob<out E> : ReceiveChannel<E>, Job {
+    @Deprecated(message = "Use ReceiveChannel itself")
+    val channel: ReceiveChannel<E>
+}
+
+/**
+ * Launches new coroutine to produce a stream of values by sending them to a channel
+ * and returns a reference to the coroutine as a [ReceiveChannel]. This resulting
+ * object can be used to [receive][ReceiveChannel.receive] elements produced by this coroutine.
+ *
+ * The scope of the coroutine contains [ProducerScope] interface, which implements
+ * both [CoroutineScope] and [SendChannel], so that coroutine can invoke
+ * [send][SendChannel.send] directly. The channel is [closed][SendChannel.close]
+ * when the coroutine completes.
+ * The running coroutine is cancelled when its receive channel is [cancelled][ReceiveChannel.cancel].
+ *
+ * The [context] for the new coroutine can be explicitly specified.
+ * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
+ * The [coroutineContext] of the parent coroutine may be used,
+ * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
+ * The parent job may be also explicitly specified using [parent] parameter.
+ *
+ * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
+ *
+ * Uncaught exceptions in this coroutine close the channel with this exception as a cause and
+ * the resulting channel becomes _failed_, so that any attempt to receive from such a channel throws exception.
+ *
+ * See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
+ *
+ * @param context context of the coroutine. The default value is [DefaultDispatcher].
+ * @param capacity capacity of the channel's buffer (no buffer by default).
+ * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).*
+ * @param onCompletion optional completion handler for the producer coroutine (see [Job.invokeOnCompletion]).
+ * @param block the coroutine code.
+ */
+public fun <E> produce(
+    context: CoroutineContext = DefaultDispatcher,
+    capacity: Int = 0,
+    parent: Job? = null,
+    onCompletion: CompletionHandler? = null,
+    block: suspend ProducerScope<E>.() -> Unit
+): ReceiveChannel<E> {
+    val channel = Channel<E>(capacity)
+    val newContext = newCoroutineContext(context, parent)
+    val coroutine = ProducerCoroutine(newContext, channel)
+    if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
+    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
+    return coroutine
+}
+
+/** @suppress **Deprecated**: Binary compatibility */
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public fun <E> produce(
+    context: CoroutineContext = DefaultDispatcher,
+    capacity: Int = 0,
+    parent: Job? = null,
+    block: suspend ProducerScope<E>.() -> Unit
+): ReceiveChannel<E> = produce(context, capacity, parent, block = block)
+
+/** @suppress **Deprecated**: Binary compatibility */
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public fun <E> produce(
+    context: CoroutineContext = DefaultDispatcher,
+    capacity: Int = 0,
+    block: suspend ProducerScope<E>.() -> Unit
+): ProducerJob<E> =
+    produce(context, capacity, block = block) as ProducerJob<E>
+
+/**
+ * @suppress **Deprecated**: Renamed to `produce`.
+ */
+@Deprecated(message = "Renamed to `produce`", replaceWith = ReplaceWith("produce(context, capacity, block)"))
+public fun <E> buildChannel(
+    context: CoroutineContext,
+    capacity: Int = 0,
+    block: suspend ProducerScope<E>.() -> Unit
+): ProducerJob<E> =
+    produce(context, capacity, block = block) as ProducerJob<E>
+
+private class ProducerCoroutine<E>(
+    parentContext: CoroutineContext, channel: Channel<E>
+) : ChannelCoroutine<E>(parentContext, channel, active = true), ProducerScope<E>, ProducerJob<E> {
+    override fun onCancellationInternal(exceptionally: CompletedExceptionally?) {
+        val cause = exceptionally?.cause
+        val processed = when (exceptionally) {
+            is Cancelled -> _channel.cancel(cause) // producer coroutine was cancelled -- cancel channel
+            else -> _channel.close(cause) // producer coroutine has completed -- close channel
+        }
+        if (!processed && cause != null)
+            handleCoroutineException(context, cause)
+    }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
new file mode 100644
index 0000000..689e36f
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+/**
+ * Rendezvous channel. This channel does not have any buffer at all. An element is transferred from sender
+ * to receiver only when [send] and [receive] invocations meet in time (rendezvous), so [send] suspends
+ * until another coroutine invokes [receive] and [receive] suspends until another coroutine invokes [send].
+ *
+ * Use `Channel()` factory function to conveniently create an instance of rendezvous channel.
+ *
+ * This implementation is fully lock-free.
+ */
+public open class RendezvousChannel<E> : AbstractChannel<E>() {
+    protected final override val isBufferAlwaysEmpty: Boolean get() = true
+    protected final override val isBufferEmpty: Boolean get() = true
+    protected final override val isBufferAlwaysFull: Boolean get() = true
+    protected final override val isBufferFull: Boolean get() = true
+}
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.common.kt
new file mode 100644
index 0000000..fe20163
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.common.kt
@@ -0,0 +1,6 @@
+package kotlinx.coroutines.experimental.internal
+
+/**
+ * Cross-platform array copy. Overlaps of source and destination are not supported
+ */
+expect fun <E> arraycopy(source: Array<E>, srcPos: Int, destination: Array<E?>, destinationStart: Int, length: Int)
\ No newline at end of file
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.common.kt
new file mode 100644
index 0000000..2b1f504
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.common.kt
@@ -0,0 +1,5 @@
+package kotlinx.coroutines.experimental.internal
+
+expect interface Closeable {
+    fun close()
+}
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.common.kt
new file mode 100644
index 0000000..df36305
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.common.kt
@@ -0,0 +1,18 @@
+package kotlinx.coroutines.experimental.internal
+
+/**
+ * Special kind of list intended to be used as collection of subscribers in [ArrayBroadcastChannel]
+ * On JVM it's CopyOnWriteList and on JS it's MutableList.
+ *
+ * Note that this alias is intentionally not named as CopyOnWriteList to avoid accidental misusage outside of ArrayBroadcastChannel
+ */
+typealias SubscribersList<E> = MutableList<E>
+
+expect fun <E> subscriberList(): SubscribersList<E>
+
+expect class ReentrantLock() {
+    fun tryLock(): Boolean
+    fun unlock(): Unit
+}
+
+expect inline fun <T> ReentrantLock.withLock(action: () -> T): T
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.common.kt
index 9cc3e14..e23fd98 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.common.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.common.kt
@@ -16,12 +16,12 @@
 
 package kotlinx.coroutines.experimental.internal
 
-import kotlin.jvm.*
-
 /** @suppress **This is unstable API and it is subject to change.** */
 public expect open class LockFreeLinkedListNode() {
     public val isRemoved: Boolean
+    public val next: Any
     public val nextNode: LockFreeLinkedListNode
+    public val prev: Any
     public val prevNode: LockFreeLinkedListNode
     public fun addLast(node: LockFreeLinkedListNode)
     public fun addOneIfEmpty(node: LockFreeLinkedListNode): Boolean
@@ -57,11 +57,23 @@
     val queue: LockFreeLinkedListNode
     val node: T
     protected override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any?
+    override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
+}
+
+public expect open class RemoveFirstDesc<T>(queue: LockFreeLinkedListNode): AbstractAtomicDesc {
+    val queue: LockFreeLinkedListNode
+    public val result: T
+    protected open fun validatePrepared(node: T): Boolean
+    protected final override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any?
+    final override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
 }
 
 /** @suppress **This is unstable API and it is subject to change.** */
 public expect abstract class AbstractAtomicDesc : AtomicDesc {
-    protected abstract fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any?
     final override fun prepare(op: AtomicOp<*>): Any?
     final override fun complete(op: AtomicOp<*>, failure: Any?)
+    protected open fun failure(affected: LockFreeLinkedListNode, next: Any): Any?
+    protected open fun retry(affected: LockFreeLinkedListNode, next: Any): Boolean
+    protected abstract fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? // non-null on failure
+    protected abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
 }
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt
new file mode 100644
index 0000000..84da40f
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt
@@ -0,0 +1,179 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class ArrayBroadcastChannelTest : TestBase() {
+
+    @Test
+    fun testBasic() = runTest {
+        expect(1)
+        val broadcast = ArrayBroadcastChannel<Int>(1)
+        assertFalse(broadcast.isClosedForSend)
+        val first = broadcast.openSubscription()
+        launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+            expect(2)
+            assertEquals(1, first.receive()) // suspends
+            assertFalse(first.isClosedForReceive)
+            expect(5)
+            assertEquals(2, first.receive()) // suspends
+            assertFalse(first.isClosedForReceive)
+            expect(10)
+            assertNull(first.receiveOrNull()) // suspends
+            assertTrue(first.isClosedForReceive)
+            expect(14)
+        }
+        expect(3)
+        broadcast.send(1)
+        expect(4)
+        yield() // to the first receiver
+        expect(6)
+
+        val second = broadcast.openSubscription()
+        launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+            expect(7)
+            assertEquals(2, second.receive()) // suspends
+            assertFalse(second.isClosedForReceive)
+            expect(11)
+            assertNull(second.receiveOrNull()) // suspends
+            assertTrue(second.isClosedForReceive)
+            expect(15)
+        }
+        expect(8)
+        broadcast.send(2)
+        expect(9)
+        yield() // to first & second receivers
+        expect(12)
+        broadcast.close()
+        expect(13)
+        assertTrue(broadcast.isClosedForSend)
+        yield() // to first & second receivers
+        finish(16)
+    }
+
+    @Test
+    fun testSendSuspend() = runTest {
+        expect(1)
+        val broadcast = ArrayBroadcastChannel<Int>(1)
+        val first = broadcast.openSubscription()
+        launch(coroutineContext) {
+            expect(4)
+            assertEquals(1, first.receive())
+            expect(5)
+            assertEquals(2, first.receive())
+            expect(6)
+        }
+        expect(2)
+        broadcast.send(1) // puts to buffer, receiver not running yet
+        expect(3)
+        broadcast.send(2) // suspends
+        finish(7)
+    }
+
+    @Test
+    fun testConcurrentSendCompletion() = runTest {
+        expect(1)
+        val broadcast = ArrayBroadcastChannel<Int>(1)
+        val sub = broadcast.openSubscription()
+        // launch 3 concurrent senders (one goes buffer, two other suspend)
+        for (x in 1..3) {
+            launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+                expect(x + 1)
+                broadcast.send(x)
+            }
+        }
+        // and close it for send
+        expect(5)
+        broadcast.close()
+        // now must receive all 3 items
+        expect(6)
+        assertFalse(sub.isClosedForReceive)
+        for (x in 1..3)
+            assertEquals(x, sub.receiveOrNull())
+        // and receive close signal
+        assertNull(sub.receiveOrNull())
+        assertTrue(sub.isClosedForReceive)
+        finish(7)
+    }
+
+    @Test
+    fun testForgetUnsubscribed() = runTest {
+        expect(1)
+        val broadcast = ArrayBroadcastChannel<Int>(1)
+        broadcast.send(1)
+        broadcast.send(2)
+        broadcast.send(3)
+        expect(2) // should not suspend anywhere above
+        val sub = broadcast.openSubscription()
+        launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+            expect(3)
+            assertEquals(4, sub.receive()) // suspends
+            expect(5)
+        }
+        expect(4)
+        broadcast.send(4) // sends
+        yield()
+        finish(6)
+    }
+
+    @Test
+    fun testReceiveFullAfterClose() = runTest {
+        val channel = BroadcastChannel<Int>(10)
+        val sub = channel.openSubscription()
+        // generate into buffer & close
+        for (x in 1..5) channel.send(x)
+        channel.close()
+        // make sure all of them are consumed
+        check(!sub.isClosedForReceive)
+        for (x in 1..5) check(sub.receive() == x)
+        check(sub.receiveOrNull() == null)
+        check(sub.isClosedForReceive)
+    }
+
+    @Test
+    fun testCloseSubDuringIteration() = runTest {
+        val channel = BroadcastChannel<Int>(1)
+        // launch generator (for later) in this context
+        launch(coroutineContext) {
+            for (x in 1..5) channel.send(x)
+            channel.close()
+        }
+        // start consuming
+        val sub = channel.openSubscription()
+        var expected = 0
+        sub.consumeEach {
+            check(it == ++expected)
+            if (it == 2) {
+                sub.close()
+            }
+        }
+        check(expected == 2)
+    }
+
+    @Test
+    fun testReceiveFromClosedSub() = runTest({ it is ClosedReceiveChannelException }) {
+        val channel = BroadcastChannel<Int>(1)
+        val sub = channel.openSubscription()
+        assertFalse(sub.isClosedForReceive)
+        sub.close()
+        assertTrue(sub.isClosedForReceive)
+        sub.receive()
+    }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt
new file mode 100644
index 0000000..61fdaef
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class ArrayChannelTest : TestBase() {
+
+    @Test
+    fun testSimple() = runTest {
+        val q = ArrayChannel<Int>(1)
+        check(q.isEmpty && !q.isFull)
+        expect(1)
+        val sender = launch(coroutineContext) {
+            expect(4)
+            q.send(1) // success -- buffered
+            check(!q.isEmpty && q.isFull)
+            expect(5)
+            q.send(2) // suspends (buffer full)
+            expect(9)
+        }
+        expect(2)
+        val receiver = launch(coroutineContext) {
+            expect(6)
+            check(q.receive() == 1) // does not suspend -- took from buffer
+            check(!q.isEmpty && q.isFull) // waiting sender's element moved to buffer
+            expect(7)
+            check(q.receive() == 2) // does not suspend (takes from sender)
+            expect(8)
+        }
+        expect(3)
+        sender.join()
+        receiver.join()
+        check(q.isEmpty && !q.isFull)
+        finish(10)
+    }
+
+    @Test
+    fun testClosedBufferedReceiveOrNull() = runTest {
+        val q = ArrayChannel<Int>(1)
+        check(q.isEmpty && !q.isFull && !q.isClosedForSend && !q.isClosedForReceive)
+        expect(1)
+        launch(coroutineContext) {
+            expect(5)
+            check(!q.isEmpty && !q.isFull && q.isClosedForSend && !q.isClosedForReceive)
+            assertEquals(42, q.receiveOrNull())
+            expect(6)
+            check(!q.isEmpty && !q.isFull && q.isClosedForSend && q.isClosedForReceive)
+            assertEquals(null, q.receiveOrNull())
+            expect(7)
+        }
+        expect(2)
+        q.send(42) // buffers
+        expect(3)
+        q.close() // goes on
+        expect(4)
+        check(!q.isEmpty && !q.isFull && q.isClosedForSend && !q.isClosedForReceive)
+        yield()
+        check(!q.isEmpty && !q.isFull && q.isClosedForSend && q.isClosedForReceive)
+        finish(8)
+    }
+
+    @Test
+    fun testClosedExceptions() = runTest {
+        val q = ArrayChannel<Int>(1)
+        expect(1)
+        launch(coroutineContext) {
+            expect(4)
+            try { q.receive() }
+            catch (e: ClosedReceiveChannelException) {
+                expect(5)
+            }
+        }
+        expect(2)
+
+        require(q.close())
+        expect(3)
+        yield()
+        expect(6)
+        try { q.send(42) }
+        catch (e: ClosedSendChannelException) {
+            finish(7)
+        }
+    }
+
+    @Test
+    fun testOfferAndPool() = runTest {
+        val q = ArrayChannel<Int>(1)
+        assertTrue(q.offer(1))
+        expect(1)
+        launch(coroutineContext) {
+            expect(3)
+            assertEquals(1, q.poll())
+            expect(4)
+            assertEquals(null, q.poll())
+            expect(5)
+            assertEquals(2, q.receive()) // suspends
+            expect(9)
+            assertEquals(3, q.poll())
+            expect(10)
+            assertEquals(null, q.poll())
+            expect(11)
+        }
+        expect(2)
+        yield()
+        expect(6)
+        assertTrue(q.offer(2))
+        expect(7)
+        assertTrue(q.offer(3))
+        expect(8)
+        assertFalse(q.offer(4))
+        yield()
+        finish(12)
+    }
+
+    @Test
+    fun testConsumeAll() = runTest {
+        val q = ArrayChannel<Int>(5)
+        for (i in 1..10) {
+            if (i <= 5) {
+                expect(i)
+                q.send(i) // shall buffer
+            } else {
+                launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+                    expect(i)
+                    q.send(i) // suspends
+                    expectUnreached() // will get cancelled by cancel
+                }
+            }
+        }
+        expect(11)
+        q.cancel()
+        check(q.isClosedForSend)
+        check(q.isClosedForReceive)
+        check(q.receiveOrNull() == null)
+        finish(12)
+    }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelFactoryTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelFactoryTest.kt
new file mode 100644
index 0000000..2bbc4a1
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelFactoryTest.kt
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlin.test.*
+
+
+class BroadcastChannelFactoryTest {
+
+    @Test
+    fun testRendezvousChannelNotSupported() {
+        assertFailsWith<IllegalArgumentException> { BroadcastChannel<Int>(0) }
+    }
+
+    @Test
+    fun testLinkedListChannelNotSupported() {
+        assertFailsWith<IllegalArgumentException> { BroadcastChannel<Int>(Channel.UNLIMITED) }
+    }
+
+    @Test
+    fun testConflatedBroadcastChannel() {
+        assertTrue { BroadcastChannel<Int>(Channel.CONFLATED) is ConflatedBroadcastChannel }
+    }
+
+    @Test
+    fun testArrayBroadcastChannel() {
+        assertTrue { BroadcastChannel<Int>(1) is ArrayBroadcastChannel }
+        assertTrue { BroadcastChannel<Int>(10) is ArrayBroadcastChannel }
+    }
+
+    @Test
+    fun testInvalidCapacityNotSupported() {
+        assertFailsWith<IllegalArgumentException> { BroadcastChannel<Int>(-2) }
+    }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelFactoryTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelFactoryTest.kt
new file mode 100644
index 0000000..d4c5126
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelFactoryTest.kt
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.test.*
+
+
+class ChannelFactoryTest : TestBase() {
+
+    @Test
+    fun testRendezvousChannel() {
+        assertTrue(Channel<Int>() is RendezvousChannel)
+        assertTrue(Channel<Int>(0) is RendezvousChannel)
+    }
+
+    @Test
+    fun testLinkedListChannel() {
+        assertTrue(Channel<Int>(Channel.UNLIMITED) is LinkedListChannel)
+    }
+
+    @Test
+    fun testConflatedChannel() {
+        assertTrue(Channel<Int>(Channel.CONFLATED) is ConflatedChannel)
+    }
+
+    @Test
+    fun testArrayChannel() {
+        assertTrue(Channel<Int>(1) is ArrayChannel)
+        assertTrue(Channel<Int>(10) is ArrayChannel)
+    }
+
+    @Test
+    fun testInvalidCapacityNotSupported() = runTest({ it is IllegalArgumentException }) {
+        Channel<Int>(-2)
+    }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt
new file mode 100644
index 0000000..0fe1fc9
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt
@@ -0,0 +1,569 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.math.*
+import kotlin.test.*
+
+class ChannelsTest: TestBase() {
+    private val testList = listOf(1, 2, 3)
+
+    @Test
+    fun testIterableAsReceiveChannel() = runTest {
+        assertEquals(testList, testList.asReceiveChannel().toList())
+    }
+
+    @Test
+    fun testSequenceAsReceiveChannel() = runTest {
+        assertEquals(testList, testList.asSequence().asReceiveChannel().toList())
+    }
+
+    @Test
+    fun testAssociate() = runTest {
+        assertEquals(testList.associate { it * 2 to it * 3 },
+            testList.asReceiveChannel().associate { it * 2 to it * 3 }.toMap())
+    }
+
+    @Test
+    fun testAssociateBy() = runTest {
+        assertEquals(testList.associateBy { it % 2 }, testList.asReceiveChannel().associateBy { it % 2 })
+    }
+
+    @Test
+    fun testAssociateBy2() = runTest {
+        assertEquals(testList.associateBy({ it * 2}, { it * 3 }),
+            testList.asReceiveChannel().associateBy({ it * 2}, { it * 3 }).toMap())
+    }
+
+    @Test
+    fun testDistinct() = runTest {
+        assertEquals(testList.map { it % 2 }.distinct(), testList.asReceiveChannel().map { it % 2 }.distinct().toList())
+    }
+
+    @Test
+    fun testDistinctBy() = runTest {
+        assertEquals(testList.distinctBy { it % 2 }.toList(), testList.asReceiveChannel().distinctBy { it % 2 }.toList())
+    }
+
+    @Test
+    fun testToCollection() = runTest {
+        val target = mutableListOf<Int>()
+        testList.asReceiveChannel().toCollection(target)
+        assertEquals(testList, target)
+    }
+
+    @Test
+    fun testDrop() = runTest {
+        for (i in 0..testList.size) {
+            assertEquals(testList.drop(i), testList.asReceiveChannel().drop(i).toList(), "Drop $i")
+        }
+    }
+
+    @Test
+    fun testElementAtOrElse() = runTest {
+        assertEquals(testList.elementAtOrElse(2) { 42 }, testList.asReceiveChannel().elementAtOrElse(2) { 42 })
+        assertEquals(testList.elementAtOrElse(9) { 42 }, testList.asReceiveChannel().elementAtOrElse(9) { 42 })
+    }
+
+    @Test
+    fun testFirst() = runTest {
+        assertEquals(testList.first(), testList.asReceiveChannel().first())
+        for (i in testList) {
+            assertEquals(testList.first { it == i }, testList.asReceiveChannel().first { it == i })
+        }
+        try {
+            testList.asReceiveChannel().first { it == 9 }
+            fail()
+        } catch (nse: NoSuchElementException) {
+        }
+    }
+
+    @Test
+    fun testFirstOrNull() = runTest {
+        assertEquals(testList.firstOrNull(), testList.asReceiveChannel().firstOrNull())
+        assertEquals(testList.firstOrNull { it == 2 }, testList.asReceiveChannel().firstOrNull { it == 2 })
+        assertEquals(testList.firstOrNull { it == 9 }, testList.asReceiveChannel().firstOrNull { it == 9 })
+    }
+
+    @Test
+    fun testFlatMap() = runTest {
+        assertEquals(testList.flatMap { (0..it).toList() }, testList.asReceiveChannel().flatMap { (0..it).asReceiveChannel() }.toList())
+
+    }
+
+    @Test
+    fun testFold() = runTest {
+        assertEquals(testList.fold(mutableListOf(42)) { acc, e -> acc.apply { add(e) } },
+            testList.asReceiveChannel().fold(mutableListOf(42)) { acc, e -> acc.apply { add(e) } }.toList())
+    }
+
+    @Test
+    fun testFoldIndexed() = runTest {
+        assertEquals(testList.foldIndexed(mutableListOf(42)) { index, acc, e -> acc.apply { add(index + e) } },
+            testList.asReceiveChannel().foldIndexed(mutableListOf(42)) { index, acc, e -> acc.apply { add(index + e) } }.toList())
+    }
+
+    @Test
+    fun testGroupBy() = runTest {
+        assertEquals(testList.groupBy { it % 2 }, testList.asReceiveChannel().groupBy { it % 2 })
+    }
+
+    @Test
+    fun testGroupBy2() = runTest {
+        assertEquals(testList.groupBy({ -it }, { it + 100 }), testList.asReceiveChannel().groupBy({ -it }, { it + 100 }).toMap())
+
+    }
+
+    @Test
+    fun testMap() = runTest {
+        assertEquals(testList.map { it + 10 }, testList.asReceiveChannel().map { it + 10 }.toList())
+
+    }
+
+    @Test
+    fun testMapToCollection() = runTest {
+        val c = mutableListOf<Int>()
+        testList.asReceiveChannel().mapTo(c) { it + 10 }
+        assertEquals(testList.map { it + 10 }, c)
+    }
+
+    @Test
+    fun testMapToSendChannel() = runTest {
+        val c = produce<Int> {
+            testList.asReceiveChannel().mapTo(channel) { it + 10 }
+        }
+        assertEquals(testList.map { it + 10 }, c.toList())
+    }
+
+    @Test
+    fun testEmptyList() = runTest {
+        assertTrue(emptyList<Nothing>().asReceiveChannel().toList().isEmpty())
+    }
+
+    @Test
+    fun testToList() = runTest {
+        assertEquals(testList, testList.asReceiveChannel().toList())
+
+    }
+
+    @Test
+    fun testEmptySet() = runTest {
+        assertTrue(emptyList<Nothing>().asReceiveChannel().toSet().isEmpty())
+
+    }
+
+    @Test
+    fun testToSet() = runTest {
+        assertEquals(testList.toSet(), testList.asReceiveChannel().toSet())
+    }
+
+    @Test
+    fun testToMutableSet() = runTest {
+        assertEquals(testList.toMutableSet(), testList.asReceiveChannel().toMutableSet())
+    }
+
+    @Test
+    fun testEmptySequence() = runTest {
+        val channel = Channel<Nothing>()
+        channel.close()
+
+        assertTrue(emptyList<Nothing>().asReceiveChannel().count() == 0)
+    }
+
+    @Test
+    fun testEmptyMap() = runTest {
+        val channel = Channel<Pair<Nothing, Nothing>>()
+        channel.close()
+
+        assertTrue(channel.toMap().isEmpty())
+    }
+
+    @Test
+    fun testToMap() = runTest {
+        val values = testList.map { it to it.toString() }
+        assertEquals(values.toMap(), values.asReceiveChannel().toMap())
+    }
+
+    @Test
+    fun testReduce() = runTest {
+        assertEquals(testList.reduce { acc, e -> acc * e },
+            testList.asReceiveChannel().reduce { acc, e -> acc * e })
+    }
+
+    @Test
+    fun testReduceIndexed() = runTest {
+        assertEquals(testList.reduceIndexed { index, acc, e -> index + acc * e },
+            testList.asReceiveChannel().reduceIndexed { index, acc, e -> index + acc * e })
+    }
+
+    @Test
+    fun testTake() = runTest {
+        for (i in 0..testList.size) {
+            assertEquals(testList.take(i), testList.asReceiveChannel().take(i).toList())
+        }
+    }
+
+    @Test
+    fun testPartition() = runTest {
+        assertEquals(testList.partition { it % 2 == 0 }, testList.asReceiveChannel().partition { it % 2 == 0 })
+    }
+
+    @Test
+    fun testZip() = runTest {
+        val other = listOf("a", "b")
+        assertEquals(testList.zip(other), testList.asReceiveChannel().zip(other.asReceiveChannel()).toList())
+    }
+
+    @Test
+    fun testElementAt() = runTest {
+        testList.indices.forEach { i ->
+            assertEquals(testList[i], testList.asReceiveChannel().elementAt(i))
+        }
+    }
+
+    @Test
+    fun testElementAtOrNull() = runTest {
+        testList.indices.forEach { i ->
+            assertEquals(testList[i], testList.asReceiveChannel().elementAtOrNull(i))
+        }
+        assertEquals(null, testList.asReceiveChannel().elementAtOrNull(-1))
+        assertEquals(null, testList.asReceiveChannel().elementAtOrNull(testList.size))
+    }
+
+    @Test
+    fun testFind() = runTest {
+        repeat(3) { mod ->
+            assertEquals(testList.find { it % 2 == mod },
+                testList.asReceiveChannel().find { it % 2 == mod })
+        }
+    }
+
+    @Test
+    fun testFindLast() = runTest {
+        repeat(3) { mod ->
+            assertEquals(testList.findLast { it % 2 == mod }, testList.asReceiveChannel().findLast { it % 2 == mod })
+        }
+    }
+
+    @Test
+    fun testIndexOf() = runTest {
+        repeat(testList.size + 1) { i ->
+            assertEquals(testList.indexOf(i), testList.asReceiveChannel().indexOf(i))
+        }
+    }
+
+    @Test
+    fun testLastIndexOf() = runTest {
+        repeat(testList.size + 1) { i ->
+            assertEquals(testList.lastIndexOf(i), testList.asReceiveChannel().lastIndexOf(i))
+        }
+    }
+
+    @Test
+    fun testIndexOfFirst() = runTest {
+        repeat(3) { mod ->
+            assertEquals(testList.indexOfFirst { it % 2 == mod },
+                testList.asReceiveChannel().indexOfFirst { it % 2 == mod })
+        }
+    }
+
+    @Test
+    fun testIndexOfLast() = runTest {
+        repeat(3) { mod ->
+            assertEquals(testList.indexOfLast { it % 2 != mod },
+                testList.asReceiveChannel().indexOfLast { it % 2 != mod })
+        }
+    }
+
+    @Test
+    fun testLastOrNull() = runTest {
+        assertEquals(testList.lastOrNull(), testList.asReceiveChannel().lastOrNull())
+        assertEquals(null, emptyList<Int>().asReceiveChannel().lastOrNull())
+    }
+
+    @Test
+    fun testSingleOrNull() = runTest {
+        assertEquals(1, listOf(1).asReceiveChannel().singleOrNull())
+        assertEquals(null, listOf(1, 2).asReceiveChannel().singleOrNull())
+        assertEquals(null, emptyList<Int>().asReceiveChannel().singleOrNull())
+        repeat(testList.size + 1) { i ->
+            assertEquals(testList.singleOrNull { it == i },
+                testList.asReceiveChannel().singleOrNull { it == i })
+        }
+        repeat(3) { mod ->
+            assertEquals(testList.singleOrNull { it % 2 == mod },
+                testList.asReceiveChannel().singleOrNull { it % 2 == mod })
+        }
+    }
+
+    @Test
+    fun testDropWhile() = runTest {
+        repeat(3) { mod ->
+            assertEquals(testList.dropWhile { it % 2 == mod },
+                testList.asReceiveChannel().dropWhile { it % 2 == mod }.toList())
+        }
+    }
+
+    @Test
+    fun testFilter() = runTest {
+        repeat(3) { mod ->
+            assertEquals(testList.filter { it % 2 == mod },
+                testList.asReceiveChannel().filter { it % 2 == mod }.toList())
+        }
+    }
+
+    @Test
+    fun testFilterToCollection() = runTest {
+        repeat(3) { mod ->
+            val c = mutableListOf<Int>()
+            testList.asReceiveChannel().filterTo(c) { it % 2 == mod }
+            assertEquals(testList.filter { it % 2 == mod }, c)
+        }
+    }
+
+    @Test
+    fun testFilterToSendChannel() = runTest {
+        repeat(3) { mod ->
+            val c = produce<Int> {
+                testList.asReceiveChannel().filterTo(channel) { it % 2 == mod }
+            }
+            assertEquals(testList.filter { it % 2 == mod }, c.toList())
+        }
+    }
+
+    @Test
+    fun testFilterNot() = runTest {
+        repeat(3) { mod ->
+            assertEquals(testList.filterNot { it % 2 == mod },
+                testList.asReceiveChannel().filterNot { it % 2 == mod }.toList())
+        }
+    }
+
+    @Test
+    fun testFilterNotToCollection() = runTest {
+        repeat(3) { mod ->
+            val c = mutableListOf<Int>()
+            testList.asReceiveChannel().filterNotTo(c) { it % 2 == mod }
+            assertEquals(testList.filterNot { it % 2 == mod }, c)
+        }
+    }
+
+    @Test
+    fun testFilterNotToSendChannel() = runTest {
+        repeat(3) { mod ->
+            val c = produce<Int> {
+                testList.asReceiveChannel().filterNotTo(channel) { it % 2 == mod }
+            }
+            assertEquals(testList.filterNot { it % 2 == mod }, c.toList())
+        }
+    }
+
+    @Test
+    fun testFilterNotNull() = runTest {
+        repeat(3) { mod ->
+            assertEquals(testList.map { it.takeIf { it % 2 == mod } }.filterNotNull(),
+                testList.asReceiveChannel().map { it.takeIf { it % 2 == mod } }.filterNotNull().toList())
+        }
+    }
+
+    @Test
+    fun testFilterNotNullToCollection() = runTest {
+        repeat(3) { mod ->
+            val c = mutableListOf<Int>()
+            testList.asReceiveChannel().map { it.takeIf { it % 2 == mod } }.filterNotNullTo(c)
+            assertEquals(testList.map { it.takeIf { it % 2 == mod } }.filterNotNull(), c)
+        }
+    }
+
+    @Test
+    fun testFilterNotNullToSendChannel() = runTest {
+        repeat(3) { mod ->
+            val c = produce<Int> {
+                testList.asReceiveChannel().map { it.takeIf { it % 2 == mod } }.filterNotNullTo(channel)
+            }
+            assertEquals(testList.map { it.takeIf { it % 2 == mod } }.filterNotNull(), c.toList())
+        }
+    }
+
+    @Test
+    fun testFilterIndexed() = runTest {
+        repeat(3) { mod ->
+            assertEquals(testList.filterIndexed { index, _ ->  index % 2 == mod },
+                testList.asReceiveChannel().filterIndexed { index, _ ->  index % 2 == mod }.toList())
+        }
+    }
+
+    @Test
+    fun testFilterIndexedToCollection() = runTest {
+        repeat(3) { mod ->
+            val c = mutableListOf<Int>()
+            testList.asReceiveChannel().filterIndexedTo(c) { index, _ ->  index % 2 == mod }
+            assertEquals(testList.filterIndexed { index, _ ->  index % 2 == mod }, c)
+        }
+    }
+
+    @Test
+    fun testFilterIndexedToChannel() = runTest {
+        repeat(3) { mod ->
+            val c = produce<Int> {
+                testList.asReceiveChannel().filterIndexedTo(channel) { index, _ -> index % 2 == mod }
+            }
+            assertEquals(testList.filterIndexed { index, _ ->  index % 2 == mod }, c.toList())
+        }
+    }
+
+    @Test
+    fun testTakeWhile() = runTest {
+        repeat(3) { mod ->
+            assertEquals(testList.takeWhile { it % 2 != mod },
+                testList.asReceiveChannel().takeWhile { it % 2 != mod }.toList())
+        }
+    }
+
+    @Test
+    fun testToChannel() = runTest {
+        val c = produce<Int> {
+            testList.asReceiveChannel().toChannel(channel)
+        }
+        assertEquals(testList, c.toList())
+    }
+
+    @Test
+    fun testMapIndexed() = runTest {
+        assertEquals(testList.mapIndexed { index, i -> index + i },
+            testList.asReceiveChannel().mapIndexed { index, i -> index + i }.toList())
+    }
+
+    @Test
+    fun testMapIndexedToCollection() = runTest {
+        val c = mutableListOf<Int>()
+        testList.asReceiveChannel().mapIndexedTo(c) { index, i -> index + i }
+        assertEquals(testList.mapIndexed { index, i -> index + i }, c)
+    }
+
+    @Test
+    fun testMapIndexedToSendChannel() = runTest {
+        val c = produce<Int> {
+            testList.asReceiveChannel().mapIndexedTo(channel) { index, i -> index + i }
+        }
+        assertEquals(testList.mapIndexed { index, i -> index + i }, c.toList())
+    }
+
+    @Test
+    fun testMapNotNull() = runTest {
+        repeat(3) { mod ->
+            assertEquals(testList.mapNotNull { i -> i.takeIf { i % 2 == mod } },
+                testList.asReceiveChannel().mapNotNull { i -> i.takeIf { i % 2 == mod } }.toList())
+        }
+    }
+
+    @Test
+    fun testMapNotNullToCollection() = runTest {
+        repeat(3) { mod ->
+            val c = mutableListOf<Int>()
+            testList.asReceiveChannel().mapNotNullTo(c) { i -> i.takeIf { i % 2 == mod } }
+            assertEquals(testList.mapNotNull { i -> i.takeIf { i % 2 == mod } }, c)
+        }
+    }
+
+    @Test
+    fun testMapNotNullToSendChannel() = runTest {
+        repeat(3) { mod ->
+            val c = produce<Int> {
+                testList.asReceiveChannel().mapNotNullTo(channel) { i -> i.takeIf { i % 2 == mod } }
+            }
+            assertEquals(testList.mapNotNull { i -> i.takeIf { i % 2 == mod } }, c.toList())
+        }
+    }
+
+    @Test
+    fun testMapIndexedNotNull() = runTest {
+        repeat(3) { mod ->
+            assertEquals(testList.mapIndexedNotNull { index, i -> index.takeIf { i % 2 == mod } },
+                testList.asReceiveChannel().mapIndexedNotNull { index, i -> index.takeIf { i % 2 == mod } }.toList())
+        }
+    }
+
+    @Test
+    fun testMapIndexedNotNullToCollection() = runTest {
+        repeat(3) { mod ->
+            val c = mutableListOf<Int>()
+            testList.asReceiveChannel().mapIndexedNotNullTo(c) { index, i -> index.takeIf { i % 2 == mod } }
+            assertEquals(testList.mapIndexedNotNull { index, i -> index.takeIf { i % 2 == mod } }, c)
+        }
+    }
+
+    @Test
+    fun testMapIndexedNotNullToSendChannel() = runTest {
+        repeat(3) { mod ->
+            val c = produce<Int> {
+                testList.asReceiveChannel().mapIndexedNotNullTo(channel) { index, i -> index.takeIf { i % 2 == mod } }
+            }
+            assertEquals(testList.mapIndexedNotNull { index, i -> index.takeIf { i % 2 == mod } }, c.toList())
+        }
+    }
+
+    @Test
+    fun testWithIndex() = runTest {
+        assertEquals(testList.withIndex().toList(), testList.asReceiveChannel().withIndex().toList())
+    }
+
+    @Test
+    fun testMaxBy() = runTest {
+        assertEquals(testList.maxBy { 10 - abs(it - 2) },
+            testList.asReceiveChannel().maxBy { 10 - abs(it - 2) })
+    }
+
+    @Test
+    fun testMaxWith() = runTest {
+        val cmp = compareBy<Int> { 10 - abs(it - 2) }
+        assertEquals(testList.maxWith(cmp),
+            testList.asReceiveChannel().maxWith(cmp))
+    }
+
+    @Test
+    fun testMinBy() = runTest {
+        assertEquals(testList.minBy { abs(it - 2) },
+            testList.asReceiveChannel().minBy { abs(it - 2) })
+    }
+
+    @Test
+    fun testMinWith() = runTest {
+        val cmp = compareBy<Int> { abs(it - 2) }
+        assertEquals(testList.minWith(cmp),
+            testList.asReceiveChannel().minWith(cmp))
+    }
+
+    @Test
+    fun testSumBy() = runTest {
+        assertEquals(testList.sumBy { it * 3 },
+            testList.asReceiveChannel().sumBy { it * 3 })
+    }
+
+    @Test
+    fun testSumByDouble() = runTest {
+        val expected = testList.sumByDouble { it * 3.0 }
+        val actual = testList.asReceiveChannel().sumByDouble { it * 3.0 }
+        assertEquals(expected, actual)
+    }
+
+    @Test
+    fun testRequireNoNulls() = runTest {
+        assertEquals(testList.requireNoNulls(), testList.asReceiveChannel().requireNoNulls().toList())
+    }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt
new file mode 100644
index 0000000..4c04f8f
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class ConflatedBroadcastChannelTest : TestBase() {
+
+    @Test
+    fun testBasicScenario() = runTest {
+        expect(1)
+        val broadcast = ConflatedBroadcastChannel<String>()
+        assertTrue(exceptionFromNotInline { broadcast.value } is IllegalStateException)
+        assertNull(broadcast.valueOrNull)
+
+        launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+            expect(2)
+            val sub = broadcast.openSubscription()
+            assertNull(sub.poll())
+            expect(3)
+            assertEquals("one", sub.receive()) // suspends
+            expect(6)
+            assertEquals("two", sub.receive()) // suspends
+            expect(12)
+            sub.close()
+            expect(13)
+        }
+
+        expect(4)
+        broadcast.send("one") // does not suspend
+        assertEquals("one", broadcast.value)
+        assertEquals("one", broadcast.valueOrNull)
+        expect(5)
+        yield() // to receiver
+        expect(7)
+        launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+            expect(8)
+            val sub = broadcast.openSubscription()
+            assertEquals("one", sub.receive()) // does not suspend
+            expect(9)
+            assertEquals("two", sub.receive()) // suspends
+            expect(14)
+            assertEquals("three", sub.receive()) // suspends
+            expect(17)
+            assertNull(sub.receiveOrNull()) // suspends until closed
+            expect(20)
+            sub.close()
+            expect(21)
+        }
+
+        expect(10)
+        broadcast.send("two") // does not suspend
+        assertEquals("two", broadcast.value)
+        assertEquals("two", broadcast.valueOrNull)
+        expect(11)
+        yield() // to both receivers
+        expect(15)
+        broadcast.send("three") // does not suspend
+        assertEquals("three", broadcast.value)
+        assertEquals("three", broadcast.valueOrNull)
+        expect(16)
+        yield() // to second receiver
+        expect(18)
+        broadcast.close()
+        assertTrue(exceptionFromNotInline { broadcast.value } is IllegalStateException)
+        assertNull(broadcast.valueOrNull)
+        expect(19)
+        yield() // to second receiver
+        assertTrue(exceptionFrom { broadcast.send("four") } is ClosedSendChannelException)
+        finish(22)
+    }
+
+    @Test
+    fun testInitialValueAndReceiveClosed() = runTest {
+        expect(1)
+        val broadcast = ConflatedBroadcastChannel(1)
+        assertEquals(1, broadcast.value)
+        assertEquals(1, broadcast.valueOrNull)
+        launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+            expect(2)
+            val sub = broadcast.openSubscription()
+            assertEquals(1, sub.receive())
+            expect(3)
+            assertTrue(exceptionFrom { sub.receive() } is ClosedReceiveChannelException) // suspends
+            expect(6)
+        }
+        expect(4)
+        broadcast.close()
+        expect(5)
+        yield() // to child
+        finish(7)
+    }
+
+    inline fun exceptionFrom(block: () -> Unit): Throwable? {
+        try {
+            block()
+            return null
+        } catch (e: Throwable) {
+            return e
+        }
+    }
+
+    // Ugly workaround for bug in JS compiler
+    fun exceptionFromNotInline(block: () -> Unit): Throwable? {
+        try {
+            block()
+            return null
+        } catch (e: Throwable) {
+            return e
+        }
+    }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt
new file mode 100644
index 0000000..1fd7413
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class ConflatedChannelTest : TestBase() {
+
+    @Test
+    fun testBasicConflationOfferPoll() {
+        val q = ConflatedChannel<Int>()
+        assertNull(q.poll())
+        assertTrue(q.offer(1))
+        assertTrue(q.offer(2))
+        assertTrue(q.offer(3))
+        assertEquals(3, q.poll())
+        assertNull(q.poll())
+    }
+
+    @Test
+    fun testConflatedSend() = runTest {
+        val q = ConflatedChannel<Int>()
+        q.send(1)
+        q.send(2) // shall conflated previously sent
+        assertEquals(2, q.receiveOrNull())
+    }
+
+    @Test
+    fun testConflatedClose() = runTest {
+        val q = ConflatedChannel<Int>()
+        q.send(1)
+        q.close() // shall conflate sent item and become closed
+        assertNull(q.receiveOrNull())
+    }
+
+    @Test
+    fun testConflationSendReceive() = runTest {
+        val q = ConflatedChannel<Int>()
+        expect(1)
+        launch(coroutineContext) { // receiver coroutine
+            expect(4)
+            assertEquals(2, q.receive())
+            expect(5)
+            assertEquals(3, q.receive()) // this receive suspends
+            expect(8)
+            assertEquals(6, q.receive()) // last conflated value
+            expect(9)
+        }
+        expect(2)
+        q.send(1)
+        q.send(2) // shall conflate
+        expect(3)
+        yield() // to receiver
+        expect(6)
+        q.send(3) // send to the waiting receiver
+        q.send(4) // buffer
+        q.send(5) // conflate
+        q.send(6) // conflate again
+        expect(7)
+        yield() // to receiver
+        finish(10)
+    }
+
+    @Test
+    fun testConsumeAll() = runTest {
+        val q = ConflatedChannel<Int>()
+        expect(1)
+        for (i in 1..10) {
+            q.send(i) // stores as last
+        }
+        q.cancel()
+        check(q.isClosedForSend)
+        check(q.isClosedForReceive)
+        check(q.receiveOrNull() == null)
+        finish(2)
+    }
+}
\ No newline at end of file
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt
new file mode 100644
index 0000000..897801e
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.TestBase
+import kotlin.test.*
+
+class LinkedListChannelTest : TestBase() {
+
+    @Test
+    fun testBasic() = runTest {
+        val c = LinkedListChannel<Int>()
+        c.send(1)
+        check(c.offer(2))
+        c.send(3)
+        check(c.close())
+        check(!c.close())
+        assertEquals(1, c.receive())
+        assertEquals(2, c.poll())
+        assertEquals(3, c.receiveOrNull())
+        assertNull(c.receiveOrNull())
+    }
+
+    @Test
+    fun testConsumeAll() = runTest {
+        val q = LinkedListChannel<Int>()
+        for (i in 1..10) {
+            q.send(i) // buffers
+        }
+        q.cancel()
+        check(q.isClosedForSend)
+        check(q.isClosedForReceive)
+        check(q.receiveOrNull() == null)
+    }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeTest.kt
new file mode 100644
index 0000000..6646aa6
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeTest.kt
@@ -0,0 +1,55 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class ProduceConsumeTest : TestBase() {
+
+    @Test
+    fun testRendezvous() = runTest {
+        testProducer(1)
+    }
+
+    @Test
+    fun testSmallBuffer() = runTest {
+        testProducer(1)
+    }
+
+    @Test
+    fun testMediumBuffer() = runTest {
+        testProducer(10)
+    }
+
+    @Test
+    fun testLargeMediumBuffer() = runTest {
+        testProducer(1000)
+    }
+
+    @Test
+    fun testUnlimited() = runTest {
+        testProducer(Channel.UNLIMITED)
+    }
+
+    private suspend fun testProducer(producerCapacity: Int) {
+        testProducer(1, producerCapacity)
+        testProducer(10, producerCapacity)
+        testProducer(100, producerCapacity)
+    }
+
+    private suspend fun testProducer(messages: Int, producerCapacity: Int) {
+        var sentAll = false
+        val producer = produce(coroutineContext, capacity = producerCapacity) {
+            for (i in 1..messages) {
+                send(i)
+            }
+            sentAll = true
+        }
+        var consumed = 0
+        for (x in producer) {
+            consumed++
+        }
+        assertTrue(sentAll)
+        assertEquals(messages, consumed)
+    }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt
new file mode 100644
index 0000000..522f6d6
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class ProduceTest : TestBase() {
+
+    @Test
+    fun testBasic() = runTest {
+        val c = produce(coroutineContext) {
+            expect(2)
+            send(1)
+            expect(3)
+            send(2)
+            expect(6)
+        }
+        expect(1)
+        check(c.receive() == 1)
+        expect(4)
+        check(c.receive() == 2)
+        expect(5)
+        check(c.receiveOrNull() == null)
+        finish(7)
+    }
+
+    @Test
+    fun testCancel() = runTest {
+        val c = produce(coroutineContext) {
+            expect(2)
+            send(1)
+            expect(3)
+            try {
+                send(2) // will get cancelled
+            } catch (e: Throwable) {
+                finish(7)
+                check(e is JobCancellationException && e.job == coroutineContext[Job])
+                throw e
+            }
+            expectUnreached()
+        }
+        expect(1)
+        check(c.receive() == 1)
+        expect(4)
+        c.cancel()
+        expect(5)
+        check(c.receiveOrNull() == null)
+        expect(6)
+    }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
new file mode 100644
index 0000000..6e1b2c3
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
@@ -0,0 +1,292 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class RendezvousChannelTest : TestBase() {
+
+    @Test
+    fun testSimple() = runTest {
+        val q = RendezvousChannel<Int>()
+        check(q.isEmpty && q.isFull)
+        expect(1)
+        val sender = launch(coroutineContext) {
+            expect(4)
+            q.send(1) // suspend -- the first to come to rendezvous
+            expect(7)
+            q.send(2) // does not suspend -- receiver is there
+            expect(8)
+        }
+        expect(2)
+        val receiver = launch(coroutineContext) {
+            expect(5)
+            check(q.receive() == 1) // does not suspend -- sender was there
+            expect(6)
+            check(q.receive() == 2) // suspends
+            expect(9)
+        }
+        expect(3)
+        sender.join()
+        receiver.join()
+        check(q.isEmpty && q.isFull)
+        finish(10)
+    }
+
+    @Test
+    fun testClosedReceiveOrNull() = runTest {
+        val q = RendezvousChannel<Int>()
+        check(q.isEmpty && q.isFull && !q.isClosedForSend && !q.isClosedForReceive)
+        expect(1)
+        launch(coroutineContext) {
+            expect(3)
+            assertEquals(42, q.receiveOrNull())
+            expect(4)
+            assertEquals(null, q.receiveOrNull())
+            expect(6)
+        }
+        expect(2)
+        q.send(42)
+        expect(5)
+        q.close()
+        check(!q.isEmpty && !q.isFull && q.isClosedForSend && q.isClosedForReceive)
+        yield()
+        check(!q.isEmpty && !q.isFull && q.isClosedForSend && q.isClosedForReceive)
+        finish(7)
+    }
+
+    @Test
+    fun testClosedExceptions() = runTest {
+        val q = RendezvousChannel<Int>()
+        expect(1)
+        launch(coroutineContext) {
+            expect(4)
+            try { q.receive() }
+            catch (e: ClosedReceiveChannelException) {
+                expect(5)
+            }
+        }
+        expect(2)
+        q.close()
+        expect(3)
+        yield()
+        expect(6)
+        try { q.send(42) }
+        catch (e: ClosedSendChannelException) {
+            finish(7)
+        }
+    }
+
+    @Test
+    fun testOfferAndPool() = runTest {
+        val q = RendezvousChannel<Int>()
+        assertFalse(q.offer(1))
+        expect(1)
+        launch(coroutineContext) {
+            expect(3)
+            assertEquals(null, q.poll())
+            expect(4)
+            assertEquals(2, q.receive())
+            expect(7)
+            assertEquals(null, q.poll())
+            yield()
+            expect(9)
+            assertEquals(3, q.poll())
+            expect(10)
+        }
+        expect(2)
+        yield()
+        expect(5)
+        assertTrue(q.offer(2))
+        expect(6)
+        yield()
+        expect(8)
+        q.send(3)
+        finish(11)
+    }
+
+    @Test
+    fun testIteratorClosed() = runTest {
+        val q = RendezvousChannel<Int>()
+        expect(1)
+        launch(coroutineContext) {
+            expect(3)
+            q.close()
+            expect(4)
+        }
+        expect(2)
+        for (x in q) {
+            expectUnreached()
+        }
+        finish(5)
+    }
+
+    @Test
+    fun testIteratorOne() = runTest {
+        val q = RendezvousChannel<Int>()
+        expect(1)
+        launch(coroutineContext) {
+            expect(3)
+            q.send(1)
+            expect(4)
+            q.close()
+            expect(5)
+        }
+        expect(2)
+        for (x in q) {
+            expect(6)
+            assertEquals(1, x)
+        }
+        finish(7)
+    }
+
+    @Test
+    fun testIteratorOneWithYield() = runTest {
+        val q = RendezvousChannel<Int>()
+        expect(1)
+        launch(coroutineContext) {
+            expect(3)
+            q.send(1) // will suspend
+            expect(6)
+            q.close()
+            expect(7)
+        }
+        expect(2)
+        yield() // yield to sender coroutine right before starting for loop
+        expect(4)
+        for (x in q) {
+            expect(5)
+            assertEquals(1, x)
+        }
+        finish(8)
+    }
+
+    @Test
+    fun testIteratorTwo() = runTest {
+        val q = RendezvousChannel<Int>()
+        expect(1)
+        launch(coroutineContext) {
+            expect(3)
+            q.send(1)
+            expect(4)
+            q.send(2)
+            expect(7)
+            q.close()
+            expect(8)
+        }
+        expect(2)
+        for (x in q) {
+            when (x) {
+                1 -> expect(5)
+                2 -> expect(6)
+                else -> expectUnreached()
+            }
+        }
+        finish(9)
+    }
+
+    @Test
+    fun testIteratorTwoWithYield() = runTest {
+        val q = RendezvousChannel<Int>()
+        expect(1)
+        launch(coroutineContext) {
+            expect(3)
+            q.send(1) // will suspend
+            expect(6)
+            q.send(2)
+            expect(7)
+            q.close()
+            expect(8)
+        }
+        expect(2)
+        yield() // yield to sender coroutine right before starting for loop
+        expect(4)
+        for (x in q) {
+            when (x) {
+                1 -> expect(5)
+                2 -> expect(9)
+                else -> expectUnreached()
+            }
+        }
+        finish(10)
+    }
+
+    @Test
+    fun testSuspendSendOnClosedChannel() = runTest {
+        val q = RendezvousChannel<Int>()
+        expect(1)
+        launch(coroutineContext) {
+            expect(4)
+            q.send(42) // suspend
+            expect(11)
+        }
+        expect(2)
+        launch(coroutineContext) {
+            expect(5)
+            q.close()
+            expect(6)
+        }
+        expect(3)
+        yield() // to sender
+        expect(7)
+        yield() // try to resume sender (it will not resume despite the close!)
+        expect(8)
+        assertEquals(42, q.receiveOrNull())
+        expect(9)
+        assertNull(q.receiveOrNull())
+        expect(10)
+        yield() // to sender, it was resumed!
+        finish(12)
+    }
+
+    class BadClass {
+        override fun equals(other: Any?): Boolean = error("equals")
+        override fun hashCode(): Int = error("hashCode")
+        override fun toString(): String = error("toString")
+    }
+
+    @Test
+    fun testProduceBadClass() = runTest {
+        val bad = BadClass()
+        val c = produce(coroutineContext) {
+            expect(1)
+            send(bad)
+        }
+        assertTrue(c.receive() === bad)
+        finish(2)
+    }
+
+    @Test
+    fun testConsumeAll() = runTest {
+        val q = RendezvousChannel<Int>()
+        for (i in 1..10) {
+            launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+                expect(i)
+                q.send(i) // suspends
+                expectUnreached() // will get cancelled by cancel
+            }
+        }
+        expect(11)
+        q.cancel()
+        check(q.isClosedForSend)
+        check(q.isClosedForReceive)
+        check(q.receiveOrNull() == null)
+        finish(12)
+    }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SendReceiveStressTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SendReceiveStressTest.kt
new file mode 100644
index 0000000..c85f541
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SendReceiveStressTest.kt
@@ -0,0 +1,46 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class SendReceiveStressTest : TestBase() {
+
+    // Emulate parametrized by hand :(
+
+    @Test
+    fun testArrayChannel() = runTest {
+        testStress(ArrayChannel(2))
+    }
+
+    @Test
+    fun testLinkedListChannel() = runTest {
+        testStress(LinkedListChannel())
+    }
+
+    @Test
+    fun testRendezvousChannel() = runTest {
+        testStress(RendezvousChannel())
+    }
+
+    private suspend fun testStress(channel: Channel<Int>) {
+        val n = 1_000 // Do not increase, otherwise node.js will fail with timeout :(
+        val sender = launch(coroutineContext) {
+            for (i in 1..n) {
+                channel.send(i)
+            }
+            expect(2)
+        }
+        val receiver = launch(coroutineContext) {
+            for (i in 1..n) {
+                val next = channel.receive()
+                check(next == i)
+            }
+            expect(3)
+        }
+        expect(1)
+        sender.join()
+        receiver.join()
+        finish(4)
+    }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt
new file mode 100644
index 0000000..69e939f
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt
@@ -0,0 +1,35 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class SimpleSendReceiveTest : TestBase() {
+
+    @Test
+    fun testSimpleSendReceive() = runTest {
+        // Parametrized common test :(
+        TestChannelKind.values().forEach { kind -> testSendReceive(kind, 100) }
+    }
+
+    private suspend fun testSendReceive(kind: TestChannelKind, iterations: Int) {
+        val channel = kind.create()
+
+        launch(coroutineContext) {
+            repeat(iterations) { channel.send(it) }
+            channel.close()
+        }
+        var expected = 0
+        for (x in channel) {
+            if (!kind.isConflated) {
+                assertEquals(expected++, x)
+            } else {
+                assertTrue(x >= expected)
+                expected = x + 1
+            }
+        }
+        if (!kind.isConflated) {
+            assertEquals(iterations, expected)
+        }
+    }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestBroadcastChannelKind.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestBroadcastChannelKind.kt
new file mode 100644
index 0000000..60dbb97
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestBroadcastChannelKind.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+enum class TestBroadcastChannelKind {
+    ARRAY_1 {
+        override fun <T> create(): BroadcastChannel<T> = ArrayBroadcastChannel(1)
+        override fun toString(): String = "ArrayBroadcastChannel(1)"
+    },
+    ARRAY_10 {
+        override fun <T> create(): BroadcastChannel<T> = ArrayBroadcastChannel(10)
+        override fun toString(): String = "ArrayBroadcastChannel(10)"
+    },
+    CONFLATED {
+        override fun <T> create(): BroadcastChannel<T> = ConflatedBroadcastChannel()
+        override fun toString(): String = "ConflatedBroadcastChannel"
+        override val isConflated: Boolean get() = true
+    }
+    ;
+
+    abstract fun <T> create(): BroadcastChannel<T>
+    open val isConflated: Boolean get() = false
+}
\ No newline at end of file
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
new file mode 100644
index 0000000..c3ac904
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.selects.SelectClause1
+
+enum class TestChannelKind {
+    RENDEZVOUS {
+        override fun create(): Channel<Int> = RendezvousChannel()
+        override fun toString(): String = "RendezvousChannel"
+    },
+    ARRAY_1 {
+        override fun create(): Channel<Int> = ArrayChannel(1)
+        override fun toString(): String = "ArrayChannel(1)"
+    },
+    ARRAY_10 {
+        override fun create(): Channel<Int> = ArrayChannel(8)
+        override fun toString(): String = "ArrayChannel(8)"
+    },
+    LINKED_LIST {
+        override fun create(): Channel<Int> = LinkedListChannel()
+        override fun toString(): String = "LinkedListChannel"
+    },
+    CONFLATED {
+        override fun create(): Channel<Int> = ConflatedChannel()
+        override fun toString(): String = "ConflatedChannel"
+        override val isConflated: Boolean get() = true
+    },
+    ARRAY_BROADCAST_1 {
+        override fun create(): Channel<Int> = ChannelViaBroadcast(ArrayBroadcastChannel<Int>(1))
+        override fun toString(): String = "ArrayBroadcastChannel(1)"
+    },
+    ARRAY_BROADCAST_10 {
+        override fun create(): Channel<Int> = ChannelViaBroadcast(ArrayBroadcastChannel<Int>(10))
+        override fun toString(): String = "ArrayBroadcastChannel(10)"
+    },
+    CONFLATED_BROADCAST {
+        override fun create(): Channel<Int> = ChannelViaBroadcast(ConflatedBroadcastChannel<Int>())
+        override fun toString(): String = "ConflatedBroadcastChannel"
+        override val isConflated: Boolean get() = true
+    }
+    ;
+
+    abstract fun create(): Channel<Int>
+    open val isConflated: Boolean get() = false
+}
+
+private class ChannelViaBroadcast<E>(
+    private val broadcast: BroadcastChannel<E>
+): Channel<E>, SendChannel<E> by broadcast {
+    val sub = broadcast.openSubscription()
+
+    override val isClosedForReceive: Boolean get() = sub.isClosedForReceive
+    override val isEmpty: Boolean get() = sub.isEmpty
+
+    // Workaround for KT-23094
+    override suspend fun send(element: E) = broadcast.send(element)
+
+    override suspend fun receive(): E = sub.receive()
+    override suspend fun receiveOrNull(): E? = sub.receiveOrNull()
+    override fun poll(): E? = sub.poll()
+    override fun iterator(): ChannelIterator<E> = sub.iterator()
+    override fun cancel(cause: Throwable?): Boolean = sub.cancel(cause)
+    override val onReceive: SelectClause1<E>
+        get() = sub.onReceive
+    override val onReceiveOrNull: SelectClause1<E?>
+        get() = sub.onReceiveOrNull
+}