ArrayBroadcastChannel
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
index 95c81a3..fb0396e 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
@@ -23,26 +23,15 @@
 import kotlin.coroutines.experimental.startCoroutine
 
 /**
- * Abstract channel. It is a base class for all channel implementations.
+ * Abstract send channel. It is a base class for all send channel implementations.
  */
-public abstract class AbstractChannel<E> : Channel<E> {
-    private val queue = LockFreeLinkedListHead()
+public abstract class AbstractSendChannel<E> : SendChannel<E> {
+    /** @suppress **This is unstable API and it is subject to change.** */
+    protected val queue = LockFreeLinkedListHead()
 
     // ------ extension points for buffered channels ------
 
     /**
-     * Returns `true` if [isBufferEmpty] is always `true`.
-     * @suppress **This is unstable API and it is subject to change.**
-     */
-    protected abstract val isBufferAlwaysEmpty: Boolean
-
-    /**
-     * Returns `true` if this channel's buffer is empty.
-     * @suppress **This is unstable API and it is subject to change.**
-     */
-    protected abstract val isBufferEmpty: Boolean
-
-    /**
      * Returns `true` if [isBufferFull] is always `true`.
      * @suppress **This is unstable API and it is subject to change.**
      */
@@ -87,55 +76,26 @@
         return receive.offerResult
     }
 
-    /**
-     * Tries to remove element from buffer or from queued sender.
-     * Return type is `E | POLL_FAILED | Closed`
-     * @suppress **This is unstable API and it is subject to change.**
-     */
-    protected open fun pollInternal(): Any? {
-        while (true) {
-            val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED
-            val token = send.tryResumeSend(idempotent = null)
-            if (token != null) {
-                send.completeResumeSend(token)
-                return send.pollResult
-            }
-        }
-    }
-
-    /**
-     * Tries to remove element from buffer or from queued sender if select statement clause was not selected yet.
-     * Return type is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
-     * @suppress **This is unstable API and it is subject to change.**
-     */
-    protected open fun pollSelectInternal(select: SelectInstance<*>): Any? {
-        // poll atomically with select
-        val pollOp = describeTryPoll()
-        val failure = select.performAtomicTrySelect(pollOp)
-        if (failure != null) return failure
-        val send = pollOp.result
-        send.completeResumeSend(pollOp.resumeToken!!)
-        return pollOp.pollResult
-    }
-
     // ------ state functions & helpers for concrete implementations ------
 
     /**
-     * Returns non-null closed token if it is first in the queue.
-     * @suppress **This is unstable API and it is subject to change.**
-     */
-    protected val closedForReceive: Any? get() = queue.next as? Closed<*>
-
-    /**
      * Returns non-null closed token if it is last in the queue.
      * @suppress **This is unstable API and it is subject to change.**
      */
-    protected val closedForSend: ReceiveOrClosed<*>? get() = queue.prev as? Closed<*>
+    protected val closedForSend: Closed<*>? get() = queue.prev as? Closed<*>
 
     /**
+     * Returns non-null closed token if it is first in the queue.
      * @suppress **This is unstable API and it is subject to change.**
      */
-    protected val hasReceiveOrClosed: Boolean get() = queue.next is ReceiveOrClosed<*>
+    protected val closedForReceive: Closed<*>? get() = queue.next as? Closed<*>
+
+    /**
+     * Retrieves first sending waiter from the queue or returns closed token.
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected fun takeFirstSendOrPeekClosed(): Send? =
+        queue.removeFirstIfIsInstanceOfOrPeekIf<Send> { it is Closed<*> }
 
     /**
      * @suppress **This is unstable API and it is subject to change.**
@@ -240,7 +200,7 @@
             queue.addLastIfPrev(send, { it !is ReceiveOrClosed<*> }) else
             queue.addLastIfPrevAndIf(send, { it !is ReceiveOrClosed<*> }, { isBufferFull })
 
-    public final override fun close(cause: Throwable?): Boolean {
+    public override fun close(cause: Throwable?): Boolean {
         val closed = Closed<E>(cause)
         while (true) {
             val receive = takeFirstReceiveOrPeekClosed()
@@ -351,6 +311,99 @@
         }
     }
 
+    // ------ private ------
+
+    private class SendSelect<R>(
+        override val pollResult: Any?,
+        @JvmField val select: SelectInstance<R>,
+        @JvmField val block: suspend () -> R
+    ) : LockFreeLinkedListNode(), Send, DisposableHandle {
+        override fun tryResumeSend(idempotent: Any?): Any? =
+            if (select.trySelect(idempotent)) SELECT_STARTED else null
+
+        override fun completeResumeSend(token: Any) {
+            check(token === SELECT_STARTED)
+            block.startCoroutine(select.completion)
+        }
+
+        fun disposeOnSelect() {
+            select.disposeOnSelect(this)
+        }
+
+        override fun dispose() {
+            remove()
+        }
+
+        override fun toString(): String = "SendSelect($pollResult)[$select]"
+    }
+
+    private class SendBuffered<out E>(
+        @JvmField val element: E
+    ) : LockFreeLinkedListNode(), Send {
+        override val pollResult: Any? get() = element
+        override fun tryResumeSend(idempotent: Any?): Any? = SEND_RESUMED
+        override fun completeResumeSend(token: Any) { check(token === SEND_RESUMED) }
+    }
+}
+
+/**
+ * Abstract send/receive channel. It is a base class for all channel implementations.
+ */
+public abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E> {
+    // ------ extension points for buffered channels ------
+
+    /**
+     * Returns `true` if [isBufferEmpty] is always `true`.
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected abstract val isBufferAlwaysEmpty: Boolean
+
+    /**
+     * Returns `true` if this channel's buffer is empty.
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected abstract val isBufferEmpty: Boolean
+
+    // ------ internal functions for override by buffered channels ------
+
+    /**
+     * Tries to remove element from buffer or from queued sender.
+     * Return type is `E | POLL_FAILED | Closed`
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected open fun pollInternal(): Any? {
+        while (true) {
+            val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED
+            val token = send.tryResumeSend(idempotent = null)
+            if (token != null) {
+                send.completeResumeSend(token)
+                return send.pollResult
+            }
+        }
+    }
+
+    /**
+     * Tries to remove element from buffer or from queued sender if select statement clause was not selected yet.
+     * Return type is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected open fun pollSelectInternal(select: SelectInstance<*>): Any? {
+        // poll atomically with select
+        val pollOp = describeTryPoll()
+        val failure = select.performAtomicTrySelect(pollOp)
+        if (failure != null) return failure
+        val send = pollOp.result
+        send.completeResumeSend(pollOp.resumeToken!!)
+        return pollOp.pollResult
+    }
+
+    // ------ state functions & helpers for concrete implementations ------
+
+    /**
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    protected val hasReceiveOrClosed: Boolean get() = queue.next is ReceiveOrClosed<*>
+
     // ------ ReceiveChannel ------
 
     public final override val isClosedForReceive: Boolean get() = closedForReceive != null && isBufferEmpty
@@ -450,14 +503,7 @@
         return if (result === POLL_FAILED) null else receiveOrNullResult(result)
     }
 
-    public final override fun iterator(): ChannelIterator<E> = Iterator(this)
-
-    /**
-     * Retrieves first sending waiter from the queue or returns closed token.
-     * @suppress **This is unstable API and it is subject to change.**
-     */
-    protected fun takeFirstSendOrPeekClosed(): Send? =
-        queue.removeFirstIfIsInstanceOfOrPeekIf<Send> { it is Closed<*> }
+    public final override fun iterator(): ChannelIterator<E> = Itr(this)
 
     // ------ registerSelectReceive ------
 
@@ -576,35 +622,6 @@
 
     // ------ protected ------
 
-    protected companion object {
-        /** @suppress **This is unstable API and it is subject to change.** */
-        @JvmField
-        val OFFER_SUCCESS: Any = Symbol("OFFER_SUCCESS")
-
-        /** @suppress **This is unstable API and it is subject to change.** */
-        @JvmField
-        val OFFER_FAILED: Any = Symbol("OFFER_FAILED")
-
-        /** @suppress **This is unstable API and it is subject to change.** */
-        @JvmField
-        val POLL_FAILED: Any = Symbol("POLL_FAILED")
-
-        /** @suppress **This is unstable API and it is subject to change.** */
-        @JvmField
-        val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")
-
-        private val SELECT_STARTED: Any = Symbol("SELECT_STARTED")
-
-        private val NULL_VALUE: Any = Symbol("NULL_VALUE")
-
-        private val CLOSE_RESUMED: Any = Symbol("CLOSE_RESUMED")
-
-        private val SEND_RESUMED = Symbol("SEND_RESUMED")
-
-        /** @suppress **This is unstable API and it is subject to change.** */
-        fun isClosed(result: Any?): Boolean = result is Closed<*>
-    }
-
     /**
      * Invoked when receiver is successfully enqueued to the queue of waiting receivers.
      */
@@ -624,7 +641,7 @@
         }
     }
 
-    private class Iterator<E>(val channel: AbstractChannel<E>) : ChannelIterator<E> {
+    private class Itr<E>(val channel: AbstractChannel<E>) : ChannelIterator<E> {
         var result: Any? = POLL_FAILED // E | POLL_FAILED | Closed
 
         suspend override fun hasNext(): Boolean {
@@ -683,92 +700,6 @@
         }
     }
 
-    /**
-     * Represents sending waiter in the queue.
-     * @suppress **This is unstable API and it is subject to change.**
-     */
-    protected interface Send {
-        val pollResult: Any? // E | Closed
-        fun tryResumeSend(idempotent: Any?): Any?
-        fun completeResumeSend(token: Any)
-    }
-
-    /**
-     * Represents receiver waiter in the queue or closed token.
-     * @suppress **This is unstable API and it is subject to change.**
-     */
-    protected interface ReceiveOrClosed<in E> {
-        val offerResult: Any // OFFER_SUCCESS | Closed
-        fun tryResumeReceive(value: E, idempotent: Any?): Any?
-        fun completeResumeReceive(token: Any)
-    }
-
-    @Suppress("UNCHECKED_CAST")
-    private class SendElement(
-        override val pollResult: Any?,
-        @JvmField val cont: CancellableContinuation<Unit>
-    ) : LockFreeLinkedListNode(), Send {
-        override fun tryResumeSend(idempotent: Any?): Any? = cont.tryResume(Unit, idempotent)
-        override fun completeResumeSend(token: Any) = cont.completeResume(token)
-        override fun toString(): String = "SendElement($pollResult)[$cont]"
-    }
-
-    private class SendSelect<R>(
-        override val pollResult: Any?,
-        @JvmField val select: SelectInstance<R>,
-        @JvmField val block: suspend () -> R
-    ) : LockFreeLinkedListNode(), Send, DisposableHandle {
-        override fun tryResumeSend(idempotent: Any?): Any? =
-            if (select.trySelect(idempotent)) SELECT_STARTED else null
-
-        override fun completeResumeSend(token: Any) {
-            check(token === SELECT_STARTED)
-            block.startCoroutine(select.completion)
-        }
-
-        fun disposeOnSelect() {
-            select.disposeOnSelect(this)
-        }
-
-        override fun dispose() {
-            remove()
-        }
-
-        override fun toString(): String = "SendSelect($pollResult)[$select]"
-    }
-
-    private class SendBuffered<out E>(
-        @JvmField val element: E
-    ) : LockFreeLinkedListNode(), Send {
-        override val pollResult: Any? get() = element
-        override fun tryResumeSend(idempotent: Any?): Any? = SEND_RESUMED
-        override fun completeResumeSend(token: Any) { check(token === SEND_RESUMED) }
-    }
-
-    /**
-     * Represents closed channel.
-     * @suppress **This is unstable API and it is subject to change.**
-     */
-    protected class Closed<in E>(
-        @JvmField val closeCause: Throwable?
-    ) : LockFreeLinkedListNode(), Send, ReceiveOrClosed<E> {
-        val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
-        val receiveException: Throwable get() = closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
-
-        override val offerResult get() = this
-        override val pollResult get() = this
-        override fun tryResumeSend(idempotent: Any?): Any? = CLOSE_RESUMED
-        override fun completeResumeSend(token: Any) { check(token === CLOSE_RESUMED) }
-        override fun tryResumeReceive(value: E, idempotent: Any?): Any? = throw sendException
-        override fun completeResumeReceive(token: Any) = throw sendException
-        override fun toString(): String = "Closed[$closeCause]"
-    }
-
-    private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
-        override val offerResult get() = OFFER_SUCCESS
-        abstract fun resumeReceiveClosed(closed: Closed<*>)
-    }
-
     private class ReceiveElement<in E>(
         @JvmField val cont: CancellableContinuation<E?>,
         @JvmField val nullOnClose: Boolean
@@ -784,13 +715,8 @@
         override fun toString(): String = "ReceiveElement[$cont,nullOnClose=$nullOnClose]"
     }
 
-    private class IdempotentTokenValue<out E>(
-        @JvmField val token: Any,
-        @JvmField val value: E
-    )
-
     private class ReceiveHasNext<E>(
-        @JvmField val iterator: Iterator<E>,
+        @JvmField val iterator: Itr<E>,
         @JvmField val cont: CancellableContinuation<Boolean>
     ) : Receive<E>() {
         override fun tryResumeReceive(value: E, idempotent: Any?): Any? {
@@ -862,4 +788,92 @@
 
         override fun toString(): String = "ReceiveSelect[$select,nullOnClose=$nullOnClose]"
     }
+
+    private class IdempotentTokenValue<out E>(
+        @JvmField val token: Any,
+        @JvmField val value: E
+    )
 }
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val OFFER_SUCCESS: Any = Symbol("OFFER_SUCCESS")
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val OFFER_FAILED: Any = Symbol("OFFER_FAILED")
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val POLL_FAILED: Any = Symbol("POLL_FAILED")
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val SELECT_STARTED: Any = Symbol("SELECT_STARTED")
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val NULL_VALUE: Any = Symbol("NULL_VALUE")
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val CLOSE_RESUMED: Any = Symbol("CLOSE_RESUMED")
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val SEND_RESUMED = Symbol("SEND_RESUMED")
+
+/**
+ * Represents sending waiter in the queue.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+public interface Send {
+    val pollResult: Any? // E | Closed
+    fun tryResumeSend(idempotent: Any?): Any?
+    fun completeResumeSend(token: Any)
+}
+
+/**
+ * Represents receiver waiter in the queue or closed token.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+public interface ReceiveOrClosed<in E> {
+    val offerResult: Any // OFFER_SUCCESS | Closed
+    fun tryResumeReceive(value: E, idempotent: Any?): Any?
+    fun completeResumeReceive(token: Any)
+}
+
+/**
+ * Represents closed channel.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+@Suppress("UNCHECKED_CAST")
+public class SendElement(
+    override val pollResult: Any?,
+    @JvmField val cont: CancellableContinuation<Unit>
+) : LockFreeLinkedListNode(), Send {
+    override fun tryResumeSend(idempotent: Any?): Any? = cont.tryResume(Unit, idempotent)
+    override fun completeResumeSend(token: Any) = cont.completeResume(token)
+    override fun toString(): String = "SendElement($pollResult)[$cont]"
+}
+
+/**
+ * Represents closed channel.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+public class Closed<in E>(
+    @JvmField val closeCause: Throwable?
+) : LockFreeLinkedListNode(), Send, ReceiveOrClosed<E> {
+    val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
+    val receiveException: Throwable get() = closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
+
+    override val offerResult get() = this
+    override val pollResult get() = this
+    override fun tryResumeSend(idempotent: Any?): Any? = CLOSE_RESUMED
+    override fun completeResumeSend(token: Any) { check(token === CLOSE_RESUMED) }
+    override fun tryResumeReceive(value: E, idempotent: Any?): Any? = throw sendException
+    override fun completeResumeReceive(token: Any) = throw sendException
+    override fun toString(): String = "Closed[$closeCause]"
+}
+
+private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
+    override val offerResult get() = OFFER_SUCCESS
+    abstract fun resumeReceiveClosed(closed: Closed<*>)
+}
+
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt
new file mode 100644
index 0000000..128e500
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt
@@ -0,0 +1,346 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.ALREADY_SELECTED
+import kotlinx.coroutines.experimental.selects.SelectInstance
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.concurrent.locks.ReentrantLock
+import kotlin.concurrent.withLock
+
+/**
+ * Broadcast channel with array buffer of a fixed [capacity].
+ * Sender suspends only when buffer is fully due to one of the receives not being late and
+ * receiver suspends only when buffer is empty.
+ *
+ * Note, that elements that are sent to the broadcast channel while there are no [open] subscribers are immediately
+ * lost.
+ *
+ * This implementation uses lock to protect the buffer, which is held only during very short buffer-update operations.
+ * The lock at each subscription is also used to manage concurrent attempts to receive from the same subscriber.
+ * The lists of suspended senders or receivers are lock-free.
+ */
+class ArrayBroadcastChannel<E>(
+    /**
+     * Buffer capacity.
+     */
+    val capacity: Int
+) : AbstractSendChannel<E>(), BroadcastChannel<E> {
+    init {
+        check(capacity >= 1) { "ArrayBroadcastChannel capacity must be at least 1, but $capacity was specified" }
+    }
+
+    private val bufferLock = ReentrantLock()
+    private val buffer: Array<Any?> = arrayOfNulls<Any?>(capacity) // guarded by lock
+
+    // head & tail are Long (64 bits) and we assume that they never wrap around
+    // head, tail, and size are guarded by bufferLock
+    @Volatile
+    private var head: Long = 0 // do modulo on use of head
+    @Volatile
+    private var tail: Long = 0 // do modulo on use of tail
+    @Volatile
+    private var size: Int = 0
+
+    private val subs = CopyOnWriteArrayList<Subscriber<E>>()
+
+    override val isBufferAlwaysFull: Boolean get() = false
+    override val isBufferFull: Boolean get() = size >= capacity
+
+    override fun open(): SubscriptionReceiveChannel<E> {
+        val sub = Subscriber(this, head)
+        subs.add(sub)
+        // between creating and adding of subscription into the list the buffer head could have been bumped past it,
+        // so here we check if it did happen and update the head in subscription in this case
+        // we did not leak newly created subscription yet, so its subHead cannot update
+        val head = this.head // volatile read after sub was added to subs
+        if (head != sub.subHead) {
+            // needs update
+            sub.subHead = head
+            updateHead() // and also must recompute head of the buffer
+        }
+        return sub
+    }
+
+    override fun close(cause: Throwable?): Boolean {
+        if (!super.close(cause)) return false
+        checkSubOffers()
+        return true
+    }
+
+    // result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
+    override fun offerInternal(element: E): Any {
+        bufferLock.withLock {
+            // check if closed for send (under lock, so size cannot change)
+            closedForSend?.let { return it }
+            val size = this.size
+            if (size >= capacity) return OFFER_FAILED
+            val tail = this.tail
+            buffer[(tail % capacity).toInt()] = element
+            this.size = size + 1
+            this.tail = tail + 1
+        }
+        // if offered successfully, then check subs outside of lock
+        checkSubOffers()
+        return OFFER_SUCCESS
+    }
+
+    // result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`
+    override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
+        bufferLock.withLock {
+            // check if closed for send (under lock, so size cannot change)
+            closedForSend?.let { return it }
+            val size = this.size
+            if (size >= capacity) return OFFER_FAILED
+            // let's try to select sending this element to buffer
+            if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
+                return ALREADY_SELECTED
+            }
+            val tail = this.tail
+            buffer[(tail % capacity).toInt()] = element
+            this.size = size + 1
+            this.tail = tail + 1
+        }
+        // if offered successfully, then check subs outside of lock
+        checkSubOffers()
+        return OFFER_SUCCESS
+    }
+
+    private fun closeSubscriber(sub: Subscriber<E>) {
+        subs.remove(sub)
+        if (head == sub.subHead)
+            updateHead()
+    }
+
+    private fun checkSubOffers() {
+        var updated = false
+        @Suppress("LoopToCallChain") // must invoke `checkOffer` on every sub
+        for (sub in subs) {
+            if (sub.checkOffer()) updated = true
+        }
+        if (updated)
+            updateHead()
+    }
+
+    private fun updateHead() {
+        // compute minHead w/o lock (it will be eventually consistent)
+        val minHead = computeMinHead()
+        // update head in a loop
+        while (true) {
+            var send: Send? = null
+            var token: Any? = null
+            bufferLock.withLock {
+                val tail = this.tail
+                var head = this.head
+                val targetHead = minHead.coerceAtMost(tail)
+                if (targetHead <= head) return // nothing to do -- head was already moved
+                var size = this.size
+                // clean up removed (on not need if we don't have any subscribers anymore)
+                while (head < targetHead) {
+                    buffer[(head % capacity).toInt()] = null
+                    val wasFull = size >= capacity
+                    // update the size before checking queue (no more senders can queue up)
+                    this.head = ++head
+                    this.size = --size
+                    if (wasFull) {
+                        while (true) {
+                            send = takeFirstSendOrPeekClosed() ?: break // when when no sender
+                            if (send is Closed<*>) break // break when closed for send
+                            token = send!!.tryResumeSend(idempotent = null)
+                            if (token != null) {
+                                // put sent element to the buffer
+                                buffer[(tail % capacity).toInt()] = (send as Send).pollResult
+                                this.size = size + 1
+                                this.tail = tail + 1
+                                return@withLock // go out of lock to wakeup this sender
+                            }
+                        }
+                    }
+                }
+                return // done updating here -> return
+            }
+            // we only get out of the lock normally when there is a sender to resume
+            send!!.completeResumeSend(token!!)
+            // since we've just sent an element, we might need to resume some receivers
+            checkSubOffers()
+        }
+    }
+
+    private fun computeMinHead(): Long {
+        var minHead = Long.MAX_VALUE
+        for (sub in subs)
+            minHead = minHead.coerceAtMost(sub.subHead) // volatile (atomic) reads of subHead
+        return minHead
+    }
+
+    @Suppress("UNCHECKED_CAST")
+    private fun elementAt(index: Long): E = buffer[(index % capacity).toInt()] as E
+
+    private class Subscriber<E>(
+        private val broadcastChannel: ArrayBroadcastChannel<E>,
+        @Volatile @JvmField var subHead: Long // guarded by lock
+    ) : AbstractChannel<E>(), SubscriptionReceiveChannel<E> {
+        private val lock = ReentrantLock()
+
+        override val isBufferAlwaysEmpty: Boolean get() = false
+        override val isBufferEmpty: Boolean get() = subHead >= broadcastChannel.tail
+        override val isBufferAlwaysFull: Boolean get() = error("Should not be used")
+        override val isBufferFull: Boolean get() = error("Should not be used")
+
+        override fun close() {
+            if (close(cause = null))
+                broadcastChannel.closeSubscriber(this)
+        }
+
+        // returns true if subHead was updated and broadcast channel's head must be checked
+        // this method is lock-free (it never waits on lock)
+        @Suppress("UNCHECKED_CAST")
+        fun checkOffer(): Boolean {
+            var updated = false
+            var closed: Closed<*>? = null
+        loop@
+            while (needsToCheckOfferWithoutLock()) {
+                // just use `tryLock` here and break when some other thread is checking under lock
+                // it means that `checkOffer` must be retried after every `unlock`
+                if (!lock.tryLock()) break
+                val receive: ReceiveOrClosed<E>?
+                val token: Any?
+                try {
+                    val result = peekUnderLock()
+                    when {
+                        result === POLL_FAILED -> continue@loop // must retest `needsToCheckOfferWithoutLock` outside of the lock
+                        result is Closed<*> -> {
+                            closed = result
+                            break@loop // was closed
+                        }
+                    }
+                    // find a receiver for an element
+                    receive = takeFirstReceiveOrPeekClosed() ?: break // break when no one's receiving
+                    if (receive is Closed<*>) break // noting more to do if this sub already closed
+                    token = receive.tryResumeReceive(result as E, idempotent = null)
+                    if (token == null) continue // bail out here to next iteration (see for next receiver)
+                    val subHead = this.subHead
+                    this.subHead = subHead + 1 // retrieved element for this subscriber
+                    updated = true
+                } finally {
+                    lock.unlock()
+                }
+                receive!!.completeResumeReceive(token!!)
+            }
+            // do close outside of lock if needed
+            closed?.also { close(cause = it.closeCause) }
+            return updated
+        }
+
+        // result is `E | POLL_FAILED | Closed`
+        override fun pollInternal(): Any? {
+            var updated = false
+            val result: Any?
+            lock.lock()
+            try {
+                result = peekUnderLock()
+                when {
+                    result is Closed<*> -> { /* just bail out of lock */ }
+                    result === POLL_FAILED -> { /* just bail out of lock */ }
+                    else -> {
+                        // update subHead after retrieiving element from buffer
+                        val subHead = this.subHead
+                        this.subHead = subHead + 1
+                        updated = true
+                    }
+                }
+            } finally {
+                lock.unlock()
+            }
+            // do close outside of lock
+            (result as? Closed<*>)?.also { close(cause = it.closeCause) }
+            // there could have been checkOffer attempt while we were holding lock
+            // now outside the lock recheck if anything else to offer
+            if (checkOffer())
+                updated = true
+            // and finally update broadcast's channel head if needed
+            if (updated)
+                broadcastChannel.updateHead()
+            return result
+        }
+
+        // result is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
+        override fun pollSelectInternal(select: SelectInstance<*>): Any? {
+            var updated = false
+            var result: Any?
+            lock.lock()
+            try {
+                result = peekUnderLock()
+                when {
+                    result is Closed<*> -> { /* just bail out of lock */ }
+                    result === POLL_FAILED -> { /* just bail out of lock */ }
+                    else -> {
+                        // let's try to select receiving this element from buffer
+                        if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
+                            result = ALREADY_SELECTED
+                        } else {
+                            // update subHead after retrieiving element from buffer
+                            val subHead = this.subHead
+                            this.subHead = subHead + 1
+                            updated = true
+                        }
+                    }
+                }
+            } finally {
+                lock.unlock()
+            }
+            // do close outside of lock
+            (result as? Closed<*>)?.also { close(cause = it.closeCause) }
+            // there could have been checkOffer attempt while we were holding lock
+            // now outside the lock recheck if anything else to offer
+            if (checkOffer())
+                updated = true
+            // and finally update broadcast's channel head if needed
+            if (updated)
+                broadcastChannel.updateHead()
+            return result
+        }
+
+        // Must invoke this check this after lock, because offer's invocation of `checkOffer` might have failed
+        // to `tryLock` just before the lock was about to unlocked, thus loosing notification to this
+        // subscription about an element that was just offered
+        private fun needsToCheckOfferWithoutLock(): Boolean {
+            if (closedForReceive != null)
+                return false // already closed -> nothing to do
+            if (isBufferEmpty && broadcastChannel.closedForReceive == null)
+                return false // no data for us && broadcast channel was not closed yet -> nothing to do
+            return true // check otherwise
+        }
+
+        // guarded by lock, returns:
+        //      E - the element from the buffer at subHead
+        //      Closed<*> when closed;
+        //      POLL_FAILED when there seems to be no data, but must retest `needsToCheckOfferWithoutLock` outside of lock
+        private fun peekUnderLock(): Any? {
+            val subHead = this.subHead // guarded read (can be non-volatile read)
+            // note: from the broadcastChannel we must read closed token first, then read its tail
+            // because it is Ok if tail moves in between the reads (we make decision based on tail first)
+            val closed = broadcastChannel.closedForReceive // unguarded volatile read
+            val tail = broadcastChannel.tail // unguarded volatile read
+            if (subHead >= tail) {
+                // no elements to poll from the queue -- check if closed
+                return closed ?: POLL_FAILED // must retest `needsToCheckOfferWithoutLock` outside of the lock
+            }
+            return broadcastChannel.elementAt(subHead)
+        }
+    }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
index c60fe63..6a7afc7 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
@@ -19,6 +19,7 @@
 import kotlinx.coroutines.experimental.ALREADY_SELECTED
 import kotlinx.coroutines.experimental.selects.SelectInstance
 import java.util.concurrent.locks.ReentrantLock
+import kotlin.concurrent.withLock
 
 /**
  * Channel with array buffer of a fixed [capacity].
@@ -43,12 +44,6 @@
     @Volatile
     private var size: Int = 0
 
-    private inline fun <T> locked(block: () -> T): T {
-        lock.lock()
-        return try { block() }
-        finally { lock.unlock() }
-    }
-
     protected final override val isBufferAlwaysEmpty: Boolean get() = false
     protected final override val isBufferEmpty: Boolean get() = size == 0
     protected final override val isBufferAlwaysFull: Boolean get() = false
@@ -58,7 +53,7 @@
     protected override fun offerInternal(element: E): Any {
         var receive: ReceiveOrClosed<E>? = null
         var token: Any? = null
-        locked {
+        lock.withLock {
             val size = this.size
             closedForSend?.let { return it }
             if (size < capacity) {
@@ -75,7 +70,7 @@
                         token = receive!!.tryResumeReceive(element, idempotent = null)
                         if (token != null) {
                             this.size = size // restore size
-                            return@locked
+                            return@withLock
                         }
                     }
                 }
@@ -90,11 +85,11 @@
         return receive!!.offerResult
     }
 
-    // result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`.
+    // result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`
     protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
         var receive: ReceiveOrClosed<E>? = null
         var token: Any? = null
-        locked {
+        lock.withLock {
             val size = this.size
             closedForSend?.let { return it }
             if (size < capacity) {
@@ -111,7 +106,7 @@
                                 receive = offerOp.result
                                 token = offerOp.resumeToken
                                 check(token != null)
-                                return@locked
+                                return@withLock
                             }
                             failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
                             failure === ALREADY_SELECTED || failure is Closed<*> -> {
@@ -123,7 +118,7 @@
                     }
                 }
                 // let's try to select sending this element to buffer
-                if (!select.trySelect(null)) {
+                if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
                     this.size = size // restore size
                     return ALREADY_SELECTED
                 }
@@ -143,9 +138,9 @@
         var send: Send? = null
         var token: Any? = null
         var result: Any? = null
-        locked {
+        lock.withLock {
             val size = this.size
-            if (size == 0) return closedForSend ?: POLL_FAILED
+            if (size == 0) return closedForSend ?: POLL_FAILED // when nothing can be read from buffer
             // size > 0: not empty -- retrieve element
             result = buffer[head]
             buffer[head] = null
@@ -162,7 +157,7 @@
                     }
                 }
             }
-            if (replacement !== POLL_FAILED && !isClosed(replacement)) {
+            if (replacement !== POLL_FAILED && replacement !is Closed<*>) {
                 this.size = size // restore size
                 buffer[(head + size) % capacity] = replacement
             }
@@ -179,7 +174,7 @@
         var send: Send? = null
         var token: Any? = null
         var result: Any? = null
-        locked {
+        lock.withLock {
             val size = this.size
             if (size == 0) return closedForSend ?: POLL_FAILED
             // size > 0: not empty -- retrieve element
@@ -216,12 +211,12 @@
                     }
                 }
             }
-            if (replacement !== POLL_FAILED && !isClosed(replacement)) {
+            if (replacement !== POLL_FAILED && replacement !is Closed<*>) {
                 this.size = size // restore size
                 buffer[(head + size) % capacity] = replacement
             } else {
                 // failed to poll or is already closed --> let's try to select receiving this element from buffer
-                if (!select.trySelect(null)) {
+                if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
                     this.size = size // restore size
                     buffer[head] = result // restore head
                     return ALREADY_SELECTED
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Symbol.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Symbol.kt
index d34f865..177715b 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Symbol.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Symbol.kt
@@ -16,6 +16,7 @@
 
 package kotlinx.coroutines.experimental.internal
 
-internal class Symbol(val symbol: String) {
+/** @suppress **This is unstable API and it is subject to change.** */
+public class Symbol(val symbol: String) {
     override fun toString(): String = symbol
 }
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt
new file mode 100644
index 0000000..bc9e533
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import org.hamcrest.core.IsEqual
+import org.hamcrest.core.IsNull
+import org.junit.Assert.*
+import org.junit.Test
+
+class ArrayBroadcastChannelTest : TestBase() {
+    @Test
+    fun testBasic() = runBlocking<Unit> {
+        expect(1)
+        val broadcast = ArrayBroadcastChannel<Int>(1)
+        assertThat(broadcast.isClosedForSend, IsEqual(false))
+        val first = broadcast.open()
+        launch(context, CoroutineStart.UNDISPATCHED) {
+            expect(2)
+            assertThat(first.receive(), IsEqual(1)) // suspends
+            assertThat(first.isClosedForReceive, IsEqual(false))
+            expect(5)
+            assertThat(first.receive(), IsEqual(2)) // suspends
+            assertThat(first.isClosedForReceive, IsEqual(false))
+            expect(10)
+            assertThat(first.receiveOrNull(), IsNull()) // suspends
+            assertThat(first.isClosedForReceive, IsEqual(true))
+            expect(14)
+        }
+        expect(3)
+        broadcast.send(1)
+        expect(4)
+        yield() // to the first receiver
+        expect(6)
+        val second = broadcast.open()
+        launch(context, CoroutineStart.UNDISPATCHED) {
+            expect(7)
+            assertThat(second.receive(), IsEqual(2)) // suspends
+            assertThat(second.isClosedForReceive, IsEqual(false))
+            expect(11)
+            assertThat(second.receiveOrNull(), IsNull()) // suspends
+            assertThat(second.isClosedForReceive, IsEqual(true))
+            expect(15)
+        }
+        expect(8)
+        broadcast.send(2)
+        expect(9)
+        yield() // to first & second receivers
+        expect(12)
+        broadcast.close()
+        expect(13)
+        assertThat(broadcast.isClosedForSend, IsEqual(true))
+        yield() // to first & second receivers
+        finish(16)
+    }
+
+    @Test
+    fun testSendSuspend() = runBlocking {
+        expect(1)
+        val broadcast = ArrayBroadcastChannel<Int>(1)
+        val first = broadcast.open()
+        launch(context) {
+            expect(4)
+            assertThat(first.receive(), IsEqual(1))
+            expect(5)
+            assertThat(first.receive(), IsEqual(2))
+            expect(6)
+        }
+        expect(2)
+        broadcast.send(1) // puts to buffer, receiver not running yet
+        expect(3)
+        broadcast.send(2) // suspends
+        finish(7)
+    }
+
+    @Test
+    fun testConcurrentSendCompletion() = runBlocking {
+        expect(1)
+        val broadcast = ArrayBroadcastChannel<Int>(1)
+        val sub = broadcast.open()
+        // launch 3 concurrent senders (one goes buffer, two other suspend)
+        for (x in 1..3) {
+            launch(context, CoroutineStart.UNDISPATCHED) {
+                expect(x + 1)
+                broadcast.send(x)
+            }
+        }
+        // and close it for send
+        expect(5)
+        broadcast.close()
+        // now must receive all 3 items
+        expect(6)
+        assertThat(sub.isClosedForReceive, IsEqual(false))
+        for (x in 1..3)
+            assertThat(sub.receiveOrNull(), IsEqual(x))
+        // and receive close signal
+        assertThat(sub.receiveOrNull(), IsNull())
+        assertThat(sub.isClosedForReceive, IsEqual(true))
+        finish(7)
+    }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt
index 93df270..e595f71 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt
@@ -99,7 +99,7 @@
         println("      Duplicated $dupCnt ints")
         failed.get()?.let { throw it }
         assertEquals(0, dupCnt)
-        if (kind != TestChannelKind.CONFLATED) {
+        if (!kind.isConflated) {
             assertEquals(0, missedCnt)
             assertEquals(lastSent, lastReceived)
         }
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt
index e2ca9c0..f3e9a89 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt
@@ -46,7 +46,7 @@
     val timeLimit = 30_000L * stressTestMultiplier // 30 sec
     val nEvents = 1_000_000 * stressTestMultiplier
 
-    val maxBuffer = 10_000 // artifical limit for LinkedListChannel
+    val maxBuffer = 10_000 // artificial limit for LinkedListChannel
 
     val channel = kind.create()
     val sendersCompleted = AtomicInteger()
@@ -112,7 +112,7 @@
         assertEquals(nReceivers, receiversCompleted.get())
         assertEquals(0, dupes.get())
         assertEquals(nEvents, sentTotal.get())
-        if (kind != TestChannelKind.CONFLATED) assertEquals(nEvents, receivedTotal.get())
+        if (!kind.isConflated) assertEquals(nEvents, receivedTotal.get())
         repeat(nReceivers) { receiveIndex ->
             assertTrue("Each receiver should have received something", receivedBy[receiveIndex] > 0)
         }
@@ -120,7 +120,7 @@
 
     private suspend fun doSent() {
         sentTotal.incrementAndGet()
-        if (kind != TestChannelKind.CONFLATED) {
+        if (!kind.isConflated) {
             while (sentTotal.get() > receivedTotal.get() + maxBuffer)
                 yield() // throttle fast senders to prevent OOM with LinkedListChannel
         }
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt
index 5021cbc..32cde4a 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt
@@ -29,14 +29,17 @@
 @RunWith(Parameterized::class)
 class SimpleSendReceiveTest(
     val kind: TestChannelKind,
-    val n: Int
+    val n: Int,
+    val concurrent: Boolean
 ) {
     companion object {
-        @Parameterized.Parameters(name = "{0}, n={1}")
+        @Parameterized.Parameters(name = "{0}, n={1}, concurrent={2}")
         @JvmStatic
         fun params(): Collection<Array<Any>> = TestChannelKind.values().flatMap { kind ->
-            listOf(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 100, 1000).map { n ->
-                arrayOf<Any>(kind, n)
+            listOf(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 100, 1000).flatMap { n ->
+                listOf(false, true).map { concurrent ->
+                    arrayOf<Any>(kind, n, concurrent)
+                }
             }
         }
     }
@@ -45,20 +48,23 @@
 
     @Test
     fun testSimpleSendReceive() = runBlocking {
-        launch(CommonPool) {
+        val ctx = if (concurrent) CommonPool else context
+        launch(ctx) {
             repeat(n) { channel.send(it) }
             channel.close()
         }
         var expected = 0
         for (x in channel) {
-            if (kind != TestChannelKind.CONFLATED) {
+            if (!kind.isConflated) {
                 assertThat(x, IsEqual(expected++))
             } else {
                 assertTrue(x >= expected)
                 expected = x + 1
             }
         }
-        if (kind != TestChannelKind.CONFLATED) {
+        if (kind.isConflated) {
+            if (n > 0) assertTrue(expected > 0)
+        } else {
             assertThat(expected, IsEqual(n))
         }
     }
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
index 2a12b03..81e8f7d 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
@@ -16,6 +16,8 @@
 
 package kotlinx.coroutines.experimental.channels
 
+import kotlinx.coroutines.experimental.selects.SelectInstance
+
 enum class TestChannelKind {
     RENDEZVOUS {
         override fun create(): Channel<Int> = RendezvousChannel<Int>()
@@ -36,8 +38,40 @@
     CONFLATED {
         override fun create(): Channel<Int> = ConflatedChannel<Int>()
         override fun toString(): String = "ConflatedChannel"
+        override val isConflated: Boolean get() = true
+    },
+    ARRAY_BROADCAST_1 {
+        override fun create(): Channel<Int> = ChannelViaBroadcast(ArrayBroadcastChannel<Int>(1))
+        override fun toString(): String = "ArrayBroadcastChannel(1)"
+    },
+    ARRAY_BROADCAST_10 {
+        override fun create(): Channel<Int> = ChannelViaBroadcast(ArrayBroadcastChannel<Int>(10))
+        override fun toString(): String = "ArrayBroadcastChannel(10)"
+    },
+    CONFLATED_BROADCAST {
+        override fun create(): Channel<Int> = ChannelViaBroadcast(ConflatedBroadcastChannel<Int>())
+        override fun toString(): String = "ConflatedBroadcastChannel"
+        override val isConflated: Boolean get() = true
     }
     ;
 
     abstract fun create(): Channel<Int>
+    open val isConflated: Boolean get() = false
+}
+
+private class ChannelViaBroadcast<E>(
+    private val broadcast: BroadcastChannel<E>
+): Channel<E>, SendChannel<E> by broadcast {
+    val sub = broadcast.open()
+
+    override val isClosedForReceive: Boolean get() = sub.isClosedForReceive
+    override val isEmpty: Boolean get() = sub.isEmpty
+    suspend override fun receive(): E = sub.receive()
+    suspend override fun receiveOrNull(): E? = sub.receiveOrNull()
+    override fun poll(): E? = sub.poll()
+    override fun iterator(): ChannelIterator<E> = sub.iterator()
+    override fun <R> registerSelectReceive(select: SelectInstance<R>, block: suspend (E) -> R) =
+        sub.registerSelectReceive(select, block)
+    override fun <R> registerSelectReceiveOrNull(select: SelectInstance<R>, block: suspend (E?) -> R) =
+        sub.registerSelectReceiveOrNull(select, block)
 }
diff --git a/reactive/coroutines-guide-reactive.md b/reactive/coroutines-guide-reactive.md
index b8bf0b7..3e49212 100644
--- a/reactive/coroutines-guide-reactive.md
+++ b/reactive/coroutines-guide-reactive.md
@@ -384,7 +384,7 @@
 effectively broadcasts elements to all its subscribers. The matching concept in coroutines world is called a 
 [BroadcastChannel]. There is a variety of subjects in Rx with 
 [BehaviorSubject](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/subjects/BehaviorSubject.html) being the
-most useful one to manage state:
+the one used to manage state:
 
 <!--- INCLUDE
 import io.reactivex.subjects.BehaviorSubject
@@ -528,6 +528,12 @@
 four
 ```
 
+Another implementation of [BroadcastChannel] is [ArrayBroadcastChannel]. It delivers every event to every
+subscriber since the moment the corresponding subscription is open. It corresponds to 
+[PublishSubject][http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/subjects/PublishSubject.html] in Rx.
+The capacity of the buffer in the constructor of `ArrayBroadcastChannel` controls the numbers of elements
+that can be sent before the sender is suspended waiting for receiver to receive those elements.
+
 <!--- TEST -->
 
 ## Operators
@@ -1056,6 +1062,7 @@
 [SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/send.html
 [BroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-broadcast-channel/index.html
 [ConflatedBroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-conflated-broadcast-channel/index.html
+[ArrayBroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-array-broadcast-channel/index.html
 <!--- INDEX kotlinx.coroutines.experimental.selects -->
 [select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/select.html
 [whileSelect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/while-select.html