ArrayBroadcastChannel
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
index 95c81a3..fb0396e 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
@@ -23,26 +23,15 @@
import kotlin.coroutines.experimental.startCoroutine
/**
- * Abstract channel. It is a base class for all channel implementations.
+ * Abstract send channel. It is a base class for all send channel implementations.
*/
-public abstract class AbstractChannel<E> : Channel<E> {
- private val queue = LockFreeLinkedListHead()
+public abstract class AbstractSendChannel<E> : SendChannel<E> {
+ /** @suppress **This is unstable API and it is subject to change.** */
+ protected val queue = LockFreeLinkedListHead()
// ------ extension points for buffered channels ------
/**
- * Returns `true` if [isBufferEmpty] is always `true`.
- * @suppress **This is unstable API and it is subject to change.**
- */
- protected abstract val isBufferAlwaysEmpty: Boolean
-
- /**
- * Returns `true` if this channel's buffer is empty.
- * @suppress **This is unstable API and it is subject to change.**
- */
- protected abstract val isBufferEmpty: Boolean
-
- /**
* Returns `true` if [isBufferFull] is always `true`.
* @suppress **This is unstable API and it is subject to change.**
*/
@@ -87,55 +76,26 @@
return receive.offerResult
}
- /**
- * Tries to remove element from buffer or from queued sender.
- * Return type is `E | POLL_FAILED | Closed`
- * @suppress **This is unstable API and it is subject to change.**
- */
- protected open fun pollInternal(): Any? {
- while (true) {
- val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED
- val token = send.tryResumeSend(idempotent = null)
- if (token != null) {
- send.completeResumeSend(token)
- return send.pollResult
- }
- }
- }
-
- /**
- * Tries to remove element from buffer or from queued sender if select statement clause was not selected yet.
- * Return type is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
- * @suppress **This is unstable API and it is subject to change.**
- */
- protected open fun pollSelectInternal(select: SelectInstance<*>): Any? {
- // poll atomically with select
- val pollOp = describeTryPoll()
- val failure = select.performAtomicTrySelect(pollOp)
- if (failure != null) return failure
- val send = pollOp.result
- send.completeResumeSend(pollOp.resumeToken!!)
- return pollOp.pollResult
- }
-
// ------ state functions & helpers for concrete implementations ------
/**
- * Returns non-null closed token if it is first in the queue.
- * @suppress **This is unstable API and it is subject to change.**
- */
- protected val closedForReceive: Any? get() = queue.next as? Closed<*>
-
- /**
* Returns non-null closed token if it is last in the queue.
* @suppress **This is unstable API and it is subject to change.**
*/
- protected val closedForSend: ReceiveOrClosed<*>? get() = queue.prev as? Closed<*>
+ protected val closedForSend: Closed<*>? get() = queue.prev as? Closed<*>
/**
+ * Returns non-null closed token if it is first in the queue.
* @suppress **This is unstable API and it is subject to change.**
*/
- protected val hasReceiveOrClosed: Boolean get() = queue.next is ReceiveOrClosed<*>
+ protected val closedForReceive: Closed<*>? get() = queue.next as? Closed<*>
+
+ /**
+ * Retrieves first sending waiter from the queue or returns closed token.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected fun takeFirstSendOrPeekClosed(): Send? =
+ queue.removeFirstIfIsInstanceOfOrPeekIf<Send> { it is Closed<*> }
/**
* @suppress **This is unstable API and it is subject to change.**
@@ -240,7 +200,7 @@
queue.addLastIfPrev(send, { it !is ReceiveOrClosed<*> }) else
queue.addLastIfPrevAndIf(send, { it !is ReceiveOrClosed<*> }, { isBufferFull })
- public final override fun close(cause: Throwable?): Boolean {
+ public override fun close(cause: Throwable?): Boolean {
val closed = Closed<E>(cause)
while (true) {
val receive = takeFirstReceiveOrPeekClosed()
@@ -351,6 +311,99 @@
}
}
+ // ------ private ------
+
+ private class SendSelect<R>(
+ override val pollResult: Any?,
+ @JvmField val select: SelectInstance<R>,
+ @JvmField val block: suspend () -> R
+ ) : LockFreeLinkedListNode(), Send, DisposableHandle {
+ override fun tryResumeSend(idempotent: Any?): Any? =
+ if (select.trySelect(idempotent)) SELECT_STARTED else null
+
+ override fun completeResumeSend(token: Any) {
+ check(token === SELECT_STARTED)
+ block.startCoroutine(select.completion)
+ }
+
+ fun disposeOnSelect() {
+ select.disposeOnSelect(this)
+ }
+
+ override fun dispose() {
+ remove()
+ }
+
+ override fun toString(): String = "SendSelect($pollResult)[$select]"
+ }
+
+ private class SendBuffered<out E>(
+ @JvmField val element: E
+ ) : LockFreeLinkedListNode(), Send {
+ override val pollResult: Any? get() = element
+ override fun tryResumeSend(idempotent: Any?): Any? = SEND_RESUMED
+ override fun completeResumeSend(token: Any) { check(token === SEND_RESUMED) }
+ }
+}
+
+/**
+ * Abstract send/receive channel. It is a base class for all channel implementations.
+ */
+public abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E> {
+ // ------ extension points for buffered channels ------
+
+ /**
+ * Returns `true` if [isBufferEmpty] is always `true`.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected abstract val isBufferAlwaysEmpty: Boolean
+
+ /**
+ * Returns `true` if this channel's buffer is empty.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected abstract val isBufferEmpty: Boolean
+
+ // ------ internal functions for override by buffered channels ------
+
+ /**
+ * Tries to remove element from buffer or from queued sender.
+ * Return type is `E | POLL_FAILED | Closed`
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected open fun pollInternal(): Any? {
+ while (true) {
+ val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED
+ val token = send.tryResumeSend(idempotent = null)
+ if (token != null) {
+ send.completeResumeSend(token)
+ return send.pollResult
+ }
+ }
+ }
+
+ /**
+ * Tries to remove element from buffer or from queued sender if select statement clause was not selected yet.
+ * Return type is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected open fun pollSelectInternal(select: SelectInstance<*>): Any? {
+ // poll atomically with select
+ val pollOp = describeTryPoll()
+ val failure = select.performAtomicTrySelect(pollOp)
+ if (failure != null) return failure
+ val send = pollOp.result
+ send.completeResumeSend(pollOp.resumeToken!!)
+ return pollOp.pollResult
+ }
+
+ // ------ state functions & helpers for concrete implementations ------
+
+ /**
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected val hasReceiveOrClosed: Boolean get() = queue.next is ReceiveOrClosed<*>
+
// ------ ReceiveChannel ------
public final override val isClosedForReceive: Boolean get() = closedForReceive != null && isBufferEmpty
@@ -450,14 +503,7 @@
return if (result === POLL_FAILED) null else receiveOrNullResult(result)
}
- public final override fun iterator(): ChannelIterator<E> = Iterator(this)
-
- /**
- * Retrieves first sending waiter from the queue or returns closed token.
- * @suppress **This is unstable API and it is subject to change.**
- */
- protected fun takeFirstSendOrPeekClosed(): Send? =
- queue.removeFirstIfIsInstanceOfOrPeekIf<Send> { it is Closed<*> }
+ public final override fun iterator(): ChannelIterator<E> = Itr(this)
// ------ registerSelectReceive ------
@@ -576,35 +622,6 @@
// ------ protected ------
- protected companion object {
- /** @suppress **This is unstable API and it is subject to change.** */
- @JvmField
- val OFFER_SUCCESS: Any = Symbol("OFFER_SUCCESS")
-
- /** @suppress **This is unstable API and it is subject to change.** */
- @JvmField
- val OFFER_FAILED: Any = Symbol("OFFER_FAILED")
-
- /** @suppress **This is unstable API and it is subject to change.** */
- @JvmField
- val POLL_FAILED: Any = Symbol("POLL_FAILED")
-
- /** @suppress **This is unstable API and it is subject to change.** */
- @JvmField
- val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")
-
- private val SELECT_STARTED: Any = Symbol("SELECT_STARTED")
-
- private val NULL_VALUE: Any = Symbol("NULL_VALUE")
-
- private val CLOSE_RESUMED: Any = Symbol("CLOSE_RESUMED")
-
- private val SEND_RESUMED = Symbol("SEND_RESUMED")
-
- /** @suppress **This is unstable API and it is subject to change.** */
- fun isClosed(result: Any?): Boolean = result is Closed<*>
- }
-
/**
* Invoked when receiver is successfully enqueued to the queue of waiting receivers.
*/
@@ -624,7 +641,7 @@
}
}
- private class Iterator<E>(val channel: AbstractChannel<E>) : ChannelIterator<E> {
+ private class Itr<E>(val channel: AbstractChannel<E>) : ChannelIterator<E> {
var result: Any? = POLL_FAILED // E | POLL_FAILED | Closed
suspend override fun hasNext(): Boolean {
@@ -683,92 +700,6 @@
}
}
- /**
- * Represents sending waiter in the queue.
- * @suppress **This is unstable API and it is subject to change.**
- */
- protected interface Send {
- val pollResult: Any? // E | Closed
- fun tryResumeSend(idempotent: Any?): Any?
- fun completeResumeSend(token: Any)
- }
-
- /**
- * Represents receiver waiter in the queue or closed token.
- * @suppress **This is unstable API and it is subject to change.**
- */
- protected interface ReceiveOrClosed<in E> {
- val offerResult: Any // OFFER_SUCCESS | Closed
- fun tryResumeReceive(value: E, idempotent: Any?): Any?
- fun completeResumeReceive(token: Any)
- }
-
- @Suppress("UNCHECKED_CAST")
- private class SendElement(
- override val pollResult: Any?,
- @JvmField val cont: CancellableContinuation<Unit>
- ) : LockFreeLinkedListNode(), Send {
- override fun tryResumeSend(idempotent: Any?): Any? = cont.tryResume(Unit, idempotent)
- override fun completeResumeSend(token: Any) = cont.completeResume(token)
- override fun toString(): String = "SendElement($pollResult)[$cont]"
- }
-
- private class SendSelect<R>(
- override val pollResult: Any?,
- @JvmField val select: SelectInstance<R>,
- @JvmField val block: suspend () -> R
- ) : LockFreeLinkedListNode(), Send, DisposableHandle {
- override fun tryResumeSend(idempotent: Any?): Any? =
- if (select.trySelect(idempotent)) SELECT_STARTED else null
-
- override fun completeResumeSend(token: Any) {
- check(token === SELECT_STARTED)
- block.startCoroutine(select.completion)
- }
-
- fun disposeOnSelect() {
- select.disposeOnSelect(this)
- }
-
- override fun dispose() {
- remove()
- }
-
- override fun toString(): String = "SendSelect($pollResult)[$select]"
- }
-
- private class SendBuffered<out E>(
- @JvmField val element: E
- ) : LockFreeLinkedListNode(), Send {
- override val pollResult: Any? get() = element
- override fun tryResumeSend(idempotent: Any?): Any? = SEND_RESUMED
- override fun completeResumeSend(token: Any) { check(token === SEND_RESUMED) }
- }
-
- /**
- * Represents closed channel.
- * @suppress **This is unstable API and it is subject to change.**
- */
- protected class Closed<in E>(
- @JvmField val closeCause: Throwable?
- ) : LockFreeLinkedListNode(), Send, ReceiveOrClosed<E> {
- val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
- val receiveException: Throwable get() = closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
-
- override val offerResult get() = this
- override val pollResult get() = this
- override fun tryResumeSend(idempotent: Any?): Any? = CLOSE_RESUMED
- override fun completeResumeSend(token: Any) { check(token === CLOSE_RESUMED) }
- override fun tryResumeReceive(value: E, idempotent: Any?): Any? = throw sendException
- override fun completeResumeReceive(token: Any) = throw sendException
- override fun toString(): String = "Closed[$closeCause]"
- }
-
- private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
- override val offerResult get() = OFFER_SUCCESS
- abstract fun resumeReceiveClosed(closed: Closed<*>)
- }
-
private class ReceiveElement<in E>(
@JvmField val cont: CancellableContinuation<E?>,
@JvmField val nullOnClose: Boolean
@@ -784,13 +715,8 @@
override fun toString(): String = "ReceiveElement[$cont,nullOnClose=$nullOnClose]"
}
- private class IdempotentTokenValue<out E>(
- @JvmField val token: Any,
- @JvmField val value: E
- )
-
private class ReceiveHasNext<E>(
- @JvmField val iterator: Iterator<E>,
+ @JvmField val iterator: Itr<E>,
@JvmField val cont: CancellableContinuation<Boolean>
) : Receive<E>() {
override fun tryResumeReceive(value: E, idempotent: Any?): Any? {
@@ -862,4 +788,92 @@
override fun toString(): String = "ReceiveSelect[$select,nullOnClose=$nullOnClose]"
}
+
+ private class IdempotentTokenValue<out E>(
+ @JvmField val token: Any,
+ @JvmField val value: E
+ )
}
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val OFFER_SUCCESS: Any = Symbol("OFFER_SUCCESS")
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val OFFER_FAILED: Any = Symbol("OFFER_FAILED")
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val POLL_FAILED: Any = Symbol("POLL_FAILED")
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val SELECT_STARTED: Any = Symbol("SELECT_STARTED")
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val NULL_VALUE: Any = Symbol("NULL_VALUE")
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val CLOSE_RESUMED: Any = Symbol("CLOSE_RESUMED")
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val SEND_RESUMED = Symbol("SEND_RESUMED")
+
+/**
+ * Represents sending waiter in the queue.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+public interface Send {
+ val pollResult: Any? // E | Closed
+ fun tryResumeSend(idempotent: Any?): Any?
+ fun completeResumeSend(token: Any)
+}
+
+/**
+ * Represents receiver waiter in the queue or closed token.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+public interface ReceiveOrClosed<in E> {
+ val offerResult: Any // OFFER_SUCCESS | Closed
+ fun tryResumeReceive(value: E, idempotent: Any?): Any?
+ fun completeResumeReceive(token: Any)
+}
+
+/**
+ * Represents closed channel.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+@Suppress("UNCHECKED_CAST")
+public class SendElement(
+ override val pollResult: Any?,
+ @JvmField val cont: CancellableContinuation<Unit>
+) : LockFreeLinkedListNode(), Send {
+ override fun tryResumeSend(idempotent: Any?): Any? = cont.tryResume(Unit, idempotent)
+ override fun completeResumeSend(token: Any) = cont.completeResume(token)
+ override fun toString(): String = "SendElement($pollResult)[$cont]"
+}
+
+/**
+ * Represents closed channel.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+public class Closed<in E>(
+ @JvmField val closeCause: Throwable?
+) : LockFreeLinkedListNode(), Send, ReceiveOrClosed<E> {
+ val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
+ val receiveException: Throwable get() = closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
+
+ override val offerResult get() = this
+ override val pollResult get() = this
+ override fun tryResumeSend(idempotent: Any?): Any? = CLOSE_RESUMED
+ override fun completeResumeSend(token: Any) { check(token === CLOSE_RESUMED) }
+ override fun tryResumeReceive(value: E, idempotent: Any?): Any? = throw sendException
+ override fun completeResumeReceive(token: Any) = throw sendException
+ override fun toString(): String = "Closed[$closeCause]"
+}
+
+private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
+ override val offerResult get() = OFFER_SUCCESS
+ abstract fun resumeReceiveClosed(closed: Closed<*>)
+}
+
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt
new file mode 100644
index 0000000..128e500
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt
@@ -0,0 +1,346 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.ALREADY_SELECTED
+import kotlinx.coroutines.experimental.selects.SelectInstance
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.concurrent.locks.ReentrantLock
+import kotlin.concurrent.withLock
+
+/**
+ * Broadcast channel with array buffer of a fixed [capacity].
+ * Sender suspends only when buffer is fully due to one of the receives not being late and
+ * receiver suspends only when buffer is empty.
+ *
+ * Note, that elements that are sent to the broadcast channel while there are no [open] subscribers are immediately
+ * lost.
+ *
+ * This implementation uses lock to protect the buffer, which is held only during very short buffer-update operations.
+ * The lock at each subscription is also used to manage concurrent attempts to receive from the same subscriber.
+ * The lists of suspended senders or receivers are lock-free.
+ */
+class ArrayBroadcastChannel<E>(
+ /**
+ * Buffer capacity.
+ */
+ val capacity: Int
+) : AbstractSendChannel<E>(), BroadcastChannel<E> {
+ init {
+ check(capacity >= 1) { "ArrayBroadcastChannel capacity must be at least 1, but $capacity was specified" }
+ }
+
+ private val bufferLock = ReentrantLock()
+ private val buffer: Array<Any?> = arrayOfNulls<Any?>(capacity) // guarded by lock
+
+ // head & tail are Long (64 bits) and we assume that they never wrap around
+ // head, tail, and size are guarded by bufferLock
+ @Volatile
+ private var head: Long = 0 // do modulo on use of head
+ @Volatile
+ private var tail: Long = 0 // do modulo on use of tail
+ @Volatile
+ private var size: Int = 0
+
+ private val subs = CopyOnWriteArrayList<Subscriber<E>>()
+
+ override val isBufferAlwaysFull: Boolean get() = false
+ override val isBufferFull: Boolean get() = size >= capacity
+
+ override fun open(): SubscriptionReceiveChannel<E> {
+ val sub = Subscriber(this, head)
+ subs.add(sub)
+ // between creating and adding of subscription into the list the buffer head could have been bumped past it,
+ // so here we check if it did happen and update the head in subscription in this case
+ // we did not leak newly created subscription yet, so its subHead cannot update
+ val head = this.head // volatile read after sub was added to subs
+ if (head != sub.subHead) {
+ // needs update
+ sub.subHead = head
+ updateHead() // and also must recompute head of the buffer
+ }
+ return sub
+ }
+
+ override fun close(cause: Throwable?): Boolean {
+ if (!super.close(cause)) return false
+ checkSubOffers()
+ return true
+ }
+
+ // result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
+ override fun offerInternal(element: E): Any {
+ bufferLock.withLock {
+ // check if closed for send (under lock, so size cannot change)
+ closedForSend?.let { return it }
+ val size = this.size
+ if (size >= capacity) return OFFER_FAILED
+ val tail = this.tail
+ buffer[(tail % capacity).toInt()] = element
+ this.size = size + 1
+ this.tail = tail + 1
+ }
+ // if offered successfully, then check subs outside of lock
+ checkSubOffers()
+ return OFFER_SUCCESS
+ }
+
+ // result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`
+ override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
+ bufferLock.withLock {
+ // check if closed for send (under lock, so size cannot change)
+ closedForSend?.let { return it }
+ val size = this.size
+ if (size >= capacity) return OFFER_FAILED
+ // let's try to select sending this element to buffer
+ if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
+ return ALREADY_SELECTED
+ }
+ val tail = this.tail
+ buffer[(tail % capacity).toInt()] = element
+ this.size = size + 1
+ this.tail = tail + 1
+ }
+ // if offered successfully, then check subs outside of lock
+ checkSubOffers()
+ return OFFER_SUCCESS
+ }
+
+ private fun closeSubscriber(sub: Subscriber<E>) {
+ subs.remove(sub)
+ if (head == sub.subHead)
+ updateHead()
+ }
+
+ private fun checkSubOffers() {
+ var updated = false
+ @Suppress("LoopToCallChain") // must invoke `checkOffer` on every sub
+ for (sub in subs) {
+ if (sub.checkOffer()) updated = true
+ }
+ if (updated)
+ updateHead()
+ }
+
+ private fun updateHead() {
+ // compute minHead w/o lock (it will be eventually consistent)
+ val minHead = computeMinHead()
+ // update head in a loop
+ while (true) {
+ var send: Send? = null
+ var token: Any? = null
+ bufferLock.withLock {
+ val tail = this.tail
+ var head = this.head
+ val targetHead = minHead.coerceAtMost(tail)
+ if (targetHead <= head) return // nothing to do -- head was already moved
+ var size = this.size
+ // clean up removed (on not need if we don't have any subscribers anymore)
+ while (head < targetHead) {
+ buffer[(head % capacity).toInt()] = null
+ val wasFull = size >= capacity
+ // update the size before checking queue (no more senders can queue up)
+ this.head = ++head
+ this.size = --size
+ if (wasFull) {
+ while (true) {
+ send = takeFirstSendOrPeekClosed() ?: break // when when no sender
+ if (send is Closed<*>) break // break when closed for send
+ token = send!!.tryResumeSend(idempotent = null)
+ if (token != null) {
+ // put sent element to the buffer
+ buffer[(tail % capacity).toInt()] = (send as Send).pollResult
+ this.size = size + 1
+ this.tail = tail + 1
+ return@withLock // go out of lock to wakeup this sender
+ }
+ }
+ }
+ }
+ return // done updating here -> return
+ }
+ // we only get out of the lock normally when there is a sender to resume
+ send!!.completeResumeSend(token!!)
+ // since we've just sent an element, we might need to resume some receivers
+ checkSubOffers()
+ }
+ }
+
+ private fun computeMinHead(): Long {
+ var minHead = Long.MAX_VALUE
+ for (sub in subs)
+ minHead = minHead.coerceAtMost(sub.subHead) // volatile (atomic) reads of subHead
+ return minHead
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ private fun elementAt(index: Long): E = buffer[(index % capacity).toInt()] as E
+
+ private class Subscriber<E>(
+ private val broadcastChannel: ArrayBroadcastChannel<E>,
+ @Volatile @JvmField var subHead: Long // guarded by lock
+ ) : AbstractChannel<E>(), SubscriptionReceiveChannel<E> {
+ private val lock = ReentrantLock()
+
+ override val isBufferAlwaysEmpty: Boolean get() = false
+ override val isBufferEmpty: Boolean get() = subHead >= broadcastChannel.tail
+ override val isBufferAlwaysFull: Boolean get() = error("Should not be used")
+ override val isBufferFull: Boolean get() = error("Should not be used")
+
+ override fun close() {
+ if (close(cause = null))
+ broadcastChannel.closeSubscriber(this)
+ }
+
+ // returns true if subHead was updated and broadcast channel's head must be checked
+ // this method is lock-free (it never waits on lock)
+ @Suppress("UNCHECKED_CAST")
+ fun checkOffer(): Boolean {
+ var updated = false
+ var closed: Closed<*>? = null
+ loop@
+ while (needsToCheckOfferWithoutLock()) {
+ // just use `tryLock` here and break when some other thread is checking under lock
+ // it means that `checkOffer` must be retried after every `unlock`
+ if (!lock.tryLock()) break
+ val receive: ReceiveOrClosed<E>?
+ val token: Any?
+ try {
+ val result = peekUnderLock()
+ when {
+ result === POLL_FAILED -> continue@loop // must retest `needsToCheckOfferWithoutLock` outside of the lock
+ result is Closed<*> -> {
+ closed = result
+ break@loop // was closed
+ }
+ }
+ // find a receiver for an element
+ receive = takeFirstReceiveOrPeekClosed() ?: break // break when no one's receiving
+ if (receive is Closed<*>) break // noting more to do if this sub already closed
+ token = receive.tryResumeReceive(result as E, idempotent = null)
+ if (token == null) continue // bail out here to next iteration (see for next receiver)
+ val subHead = this.subHead
+ this.subHead = subHead + 1 // retrieved element for this subscriber
+ updated = true
+ } finally {
+ lock.unlock()
+ }
+ receive!!.completeResumeReceive(token!!)
+ }
+ // do close outside of lock if needed
+ closed?.also { close(cause = it.closeCause) }
+ return updated
+ }
+
+ // result is `E | POLL_FAILED | Closed`
+ override fun pollInternal(): Any? {
+ var updated = false
+ val result: Any?
+ lock.lock()
+ try {
+ result = peekUnderLock()
+ when {
+ result is Closed<*> -> { /* just bail out of lock */ }
+ result === POLL_FAILED -> { /* just bail out of lock */ }
+ else -> {
+ // update subHead after retrieiving element from buffer
+ val subHead = this.subHead
+ this.subHead = subHead + 1
+ updated = true
+ }
+ }
+ } finally {
+ lock.unlock()
+ }
+ // do close outside of lock
+ (result as? Closed<*>)?.also { close(cause = it.closeCause) }
+ // there could have been checkOffer attempt while we were holding lock
+ // now outside the lock recheck if anything else to offer
+ if (checkOffer())
+ updated = true
+ // and finally update broadcast's channel head if needed
+ if (updated)
+ broadcastChannel.updateHead()
+ return result
+ }
+
+ // result is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
+ override fun pollSelectInternal(select: SelectInstance<*>): Any? {
+ var updated = false
+ var result: Any?
+ lock.lock()
+ try {
+ result = peekUnderLock()
+ when {
+ result is Closed<*> -> { /* just bail out of lock */ }
+ result === POLL_FAILED -> { /* just bail out of lock */ }
+ else -> {
+ // let's try to select receiving this element from buffer
+ if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
+ result = ALREADY_SELECTED
+ } else {
+ // update subHead after retrieiving element from buffer
+ val subHead = this.subHead
+ this.subHead = subHead + 1
+ updated = true
+ }
+ }
+ }
+ } finally {
+ lock.unlock()
+ }
+ // do close outside of lock
+ (result as? Closed<*>)?.also { close(cause = it.closeCause) }
+ // there could have been checkOffer attempt while we were holding lock
+ // now outside the lock recheck if anything else to offer
+ if (checkOffer())
+ updated = true
+ // and finally update broadcast's channel head if needed
+ if (updated)
+ broadcastChannel.updateHead()
+ return result
+ }
+
+ // Must invoke this check this after lock, because offer's invocation of `checkOffer` might have failed
+ // to `tryLock` just before the lock was about to unlocked, thus loosing notification to this
+ // subscription about an element that was just offered
+ private fun needsToCheckOfferWithoutLock(): Boolean {
+ if (closedForReceive != null)
+ return false // already closed -> nothing to do
+ if (isBufferEmpty && broadcastChannel.closedForReceive == null)
+ return false // no data for us && broadcast channel was not closed yet -> nothing to do
+ return true // check otherwise
+ }
+
+ // guarded by lock, returns:
+ // E - the element from the buffer at subHead
+ // Closed<*> when closed;
+ // POLL_FAILED when there seems to be no data, but must retest `needsToCheckOfferWithoutLock` outside of lock
+ private fun peekUnderLock(): Any? {
+ val subHead = this.subHead // guarded read (can be non-volatile read)
+ // note: from the broadcastChannel we must read closed token first, then read its tail
+ // because it is Ok if tail moves in between the reads (we make decision based on tail first)
+ val closed = broadcastChannel.closedForReceive // unguarded volatile read
+ val tail = broadcastChannel.tail // unguarded volatile read
+ if (subHead >= tail) {
+ // no elements to poll from the queue -- check if closed
+ return closed ?: POLL_FAILED // must retest `needsToCheckOfferWithoutLock` outside of the lock
+ }
+ return broadcastChannel.elementAt(subHead)
+ }
+ }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
index c60fe63..6a7afc7 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
@@ -19,6 +19,7 @@
import kotlinx.coroutines.experimental.ALREADY_SELECTED
import kotlinx.coroutines.experimental.selects.SelectInstance
import java.util.concurrent.locks.ReentrantLock
+import kotlin.concurrent.withLock
/**
* Channel with array buffer of a fixed [capacity].
@@ -43,12 +44,6 @@
@Volatile
private var size: Int = 0
- private inline fun <T> locked(block: () -> T): T {
- lock.lock()
- return try { block() }
- finally { lock.unlock() }
- }
-
protected final override val isBufferAlwaysEmpty: Boolean get() = false
protected final override val isBufferEmpty: Boolean get() = size == 0
protected final override val isBufferAlwaysFull: Boolean get() = false
@@ -58,7 +53,7 @@
protected override fun offerInternal(element: E): Any {
var receive: ReceiveOrClosed<E>? = null
var token: Any? = null
- locked {
+ lock.withLock {
val size = this.size
closedForSend?.let { return it }
if (size < capacity) {
@@ -75,7 +70,7 @@
token = receive!!.tryResumeReceive(element, idempotent = null)
if (token != null) {
this.size = size // restore size
- return@locked
+ return@withLock
}
}
}
@@ -90,11 +85,11 @@
return receive!!.offerResult
}
- // result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`.
+ // result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`
protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
var receive: ReceiveOrClosed<E>? = null
var token: Any? = null
- locked {
+ lock.withLock {
val size = this.size
closedForSend?.let { return it }
if (size < capacity) {
@@ -111,7 +106,7 @@
receive = offerOp.result
token = offerOp.resumeToken
check(token != null)
- return@locked
+ return@withLock
}
failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
failure === ALREADY_SELECTED || failure is Closed<*> -> {
@@ -123,7 +118,7 @@
}
}
// let's try to select sending this element to buffer
- if (!select.trySelect(null)) {
+ if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
this.size = size // restore size
return ALREADY_SELECTED
}
@@ -143,9 +138,9 @@
var send: Send? = null
var token: Any? = null
var result: Any? = null
- locked {
+ lock.withLock {
val size = this.size
- if (size == 0) return closedForSend ?: POLL_FAILED
+ if (size == 0) return closedForSend ?: POLL_FAILED // when nothing can be read from buffer
// size > 0: not empty -- retrieve element
result = buffer[head]
buffer[head] = null
@@ -162,7 +157,7 @@
}
}
}
- if (replacement !== POLL_FAILED && !isClosed(replacement)) {
+ if (replacement !== POLL_FAILED && replacement !is Closed<*>) {
this.size = size // restore size
buffer[(head + size) % capacity] = replacement
}
@@ -179,7 +174,7 @@
var send: Send? = null
var token: Any? = null
var result: Any? = null
- locked {
+ lock.withLock {
val size = this.size
if (size == 0) return closedForSend ?: POLL_FAILED
// size > 0: not empty -- retrieve element
@@ -216,12 +211,12 @@
}
}
}
- if (replacement !== POLL_FAILED && !isClosed(replacement)) {
+ if (replacement !== POLL_FAILED && replacement !is Closed<*>) {
this.size = size // restore size
buffer[(head + size) % capacity] = replacement
} else {
// failed to poll or is already closed --> let's try to select receiving this element from buffer
- if (!select.trySelect(null)) {
+ if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
this.size = size // restore size
buffer[head] = result // restore head
return ALREADY_SELECTED
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Symbol.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Symbol.kt
index d34f865..177715b 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Symbol.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Symbol.kt
@@ -16,6 +16,7 @@
package kotlinx.coroutines.experimental.internal
-internal class Symbol(val symbol: String) {
+/** @suppress **This is unstable API and it is subject to change.** */
+public class Symbol(val symbol: String) {
override fun toString(): String = symbol
}
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt
new file mode 100644
index 0000000..bc9e533
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import org.hamcrest.core.IsEqual
+import org.hamcrest.core.IsNull
+import org.junit.Assert.*
+import org.junit.Test
+
+class ArrayBroadcastChannelTest : TestBase() {
+ @Test
+ fun testBasic() = runBlocking<Unit> {
+ expect(1)
+ val broadcast = ArrayBroadcastChannel<Int>(1)
+ assertThat(broadcast.isClosedForSend, IsEqual(false))
+ val first = broadcast.open()
+ launch(context, CoroutineStart.UNDISPATCHED) {
+ expect(2)
+ assertThat(first.receive(), IsEqual(1)) // suspends
+ assertThat(first.isClosedForReceive, IsEqual(false))
+ expect(5)
+ assertThat(first.receive(), IsEqual(2)) // suspends
+ assertThat(first.isClosedForReceive, IsEqual(false))
+ expect(10)
+ assertThat(first.receiveOrNull(), IsNull()) // suspends
+ assertThat(first.isClosedForReceive, IsEqual(true))
+ expect(14)
+ }
+ expect(3)
+ broadcast.send(1)
+ expect(4)
+ yield() // to the first receiver
+ expect(6)
+ val second = broadcast.open()
+ launch(context, CoroutineStart.UNDISPATCHED) {
+ expect(7)
+ assertThat(second.receive(), IsEqual(2)) // suspends
+ assertThat(second.isClosedForReceive, IsEqual(false))
+ expect(11)
+ assertThat(second.receiveOrNull(), IsNull()) // suspends
+ assertThat(second.isClosedForReceive, IsEqual(true))
+ expect(15)
+ }
+ expect(8)
+ broadcast.send(2)
+ expect(9)
+ yield() // to first & second receivers
+ expect(12)
+ broadcast.close()
+ expect(13)
+ assertThat(broadcast.isClosedForSend, IsEqual(true))
+ yield() // to first & second receivers
+ finish(16)
+ }
+
+ @Test
+ fun testSendSuspend() = runBlocking {
+ expect(1)
+ val broadcast = ArrayBroadcastChannel<Int>(1)
+ val first = broadcast.open()
+ launch(context) {
+ expect(4)
+ assertThat(first.receive(), IsEqual(1))
+ expect(5)
+ assertThat(first.receive(), IsEqual(2))
+ expect(6)
+ }
+ expect(2)
+ broadcast.send(1) // puts to buffer, receiver not running yet
+ expect(3)
+ broadcast.send(2) // suspends
+ finish(7)
+ }
+
+ @Test
+ fun testConcurrentSendCompletion() = runBlocking {
+ expect(1)
+ val broadcast = ArrayBroadcastChannel<Int>(1)
+ val sub = broadcast.open()
+ // launch 3 concurrent senders (one goes buffer, two other suspend)
+ for (x in 1..3) {
+ launch(context, CoroutineStart.UNDISPATCHED) {
+ expect(x + 1)
+ broadcast.send(x)
+ }
+ }
+ // and close it for send
+ expect(5)
+ broadcast.close()
+ // now must receive all 3 items
+ expect(6)
+ assertThat(sub.isClosedForReceive, IsEqual(false))
+ for (x in 1..3)
+ assertThat(sub.receiveOrNull(), IsEqual(x))
+ // and receive close signal
+ assertThat(sub.receiveOrNull(), IsNull())
+ assertThat(sub.isClosedForReceive, IsEqual(true))
+ finish(7)
+ }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt
index 93df270..e595f71 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt
@@ -99,7 +99,7 @@
println(" Duplicated $dupCnt ints")
failed.get()?.let { throw it }
assertEquals(0, dupCnt)
- if (kind != TestChannelKind.CONFLATED) {
+ if (!kind.isConflated) {
assertEquals(0, missedCnt)
assertEquals(lastSent, lastReceived)
}
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt
index e2ca9c0..f3e9a89 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt
@@ -46,7 +46,7 @@
val timeLimit = 30_000L * stressTestMultiplier // 30 sec
val nEvents = 1_000_000 * stressTestMultiplier
- val maxBuffer = 10_000 // artifical limit for LinkedListChannel
+ val maxBuffer = 10_000 // artificial limit for LinkedListChannel
val channel = kind.create()
val sendersCompleted = AtomicInteger()
@@ -112,7 +112,7 @@
assertEquals(nReceivers, receiversCompleted.get())
assertEquals(0, dupes.get())
assertEquals(nEvents, sentTotal.get())
- if (kind != TestChannelKind.CONFLATED) assertEquals(nEvents, receivedTotal.get())
+ if (!kind.isConflated) assertEquals(nEvents, receivedTotal.get())
repeat(nReceivers) { receiveIndex ->
assertTrue("Each receiver should have received something", receivedBy[receiveIndex] > 0)
}
@@ -120,7 +120,7 @@
private suspend fun doSent() {
sentTotal.incrementAndGet()
- if (kind != TestChannelKind.CONFLATED) {
+ if (!kind.isConflated) {
while (sentTotal.get() > receivedTotal.get() + maxBuffer)
yield() // throttle fast senders to prevent OOM with LinkedListChannel
}
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt
index 5021cbc..32cde4a 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt
@@ -29,14 +29,17 @@
@RunWith(Parameterized::class)
class SimpleSendReceiveTest(
val kind: TestChannelKind,
- val n: Int
+ val n: Int,
+ val concurrent: Boolean
) {
companion object {
- @Parameterized.Parameters(name = "{0}, n={1}")
+ @Parameterized.Parameters(name = "{0}, n={1}, concurrent={2}")
@JvmStatic
fun params(): Collection<Array<Any>> = TestChannelKind.values().flatMap { kind ->
- listOf(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 100, 1000).map { n ->
- arrayOf<Any>(kind, n)
+ listOf(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 100, 1000).flatMap { n ->
+ listOf(false, true).map { concurrent ->
+ arrayOf<Any>(kind, n, concurrent)
+ }
}
}
}
@@ -45,20 +48,23 @@
@Test
fun testSimpleSendReceive() = runBlocking {
- launch(CommonPool) {
+ val ctx = if (concurrent) CommonPool else context
+ launch(ctx) {
repeat(n) { channel.send(it) }
channel.close()
}
var expected = 0
for (x in channel) {
- if (kind != TestChannelKind.CONFLATED) {
+ if (!kind.isConflated) {
assertThat(x, IsEqual(expected++))
} else {
assertTrue(x >= expected)
expected = x + 1
}
}
- if (kind != TestChannelKind.CONFLATED) {
+ if (kind.isConflated) {
+ if (n > 0) assertTrue(expected > 0)
+ } else {
assertThat(expected, IsEqual(n))
}
}
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
index 2a12b03..81e8f7d 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
@@ -16,6 +16,8 @@
package kotlinx.coroutines.experimental.channels
+import kotlinx.coroutines.experimental.selects.SelectInstance
+
enum class TestChannelKind {
RENDEZVOUS {
override fun create(): Channel<Int> = RendezvousChannel<Int>()
@@ -36,8 +38,40 @@
CONFLATED {
override fun create(): Channel<Int> = ConflatedChannel<Int>()
override fun toString(): String = "ConflatedChannel"
+ override val isConflated: Boolean get() = true
+ },
+ ARRAY_BROADCAST_1 {
+ override fun create(): Channel<Int> = ChannelViaBroadcast(ArrayBroadcastChannel<Int>(1))
+ override fun toString(): String = "ArrayBroadcastChannel(1)"
+ },
+ ARRAY_BROADCAST_10 {
+ override fun create(): Channel<Int> = ChannelViaBroadcast(ArrayBroadcastChannel<Int>(10))
+ override fun toString(): String = "ArrayBroadcastChannel(10)"
+ },
+ CONFLATED_BROADCAST {
+ override fun create(): Channel<Int> = ChannelViaBroadcast(ConflatedBroadcastChannel<Int>())
+ override fun toString(): String = "ConflatedBroadcastChannel"
+ override val isConflated: Boolean get() = true
}
;
abstract fun create(): Channel<Int>
+ open val isConflated: Boolean get() = false
+}
+
+private class ChannelViaBroadcast<E>(
+ private val broadcast: BroadcastChannel<E>
+): Channel<E>, SendChannel<E> by broadcast {
+ val sub = broadcast.open()
+
+ override val isClosedForReceive: Boolean get() = sub.isClosedForReceive
+ override val isEmpty: Boolean get() = sub.isEmpty
+ suspend override fun receive(): E = sub.receive()
+ suspend override fun receiveOrNull(): E? = sub.receiveOrNull()
+ override fun poll(): E? = sub.poll()
+ override fun iterator(): ChannelIterator<E> = sub.iterator()
+ override fun <R> registerSelectReceive(select: SelectInstance<R>, block: suspend (E) -> R) =
+ sub.registerSelectReceive(select, block)
+ override fun <R> registerSelectReceiveOrNull(select: SelectInstance<R>, block: suspend (E?) -> R) =
+ sub.registerSelectReceiveOrNull(select, block)
}
diff --git a/reactive/coroutines-guide-reactive.md b/reactive/coroutines-guide-reactive.md
index b8bf0b7..3e49212 100644
--- a/reactive/coroutines-guide-reactive.md
+++ b/reactive/coroutines-guide-reactive.md
@@ -384,7 +384,7 @@
effectively broadcasts elements to all its subscribers. The matching concept in coroutines world is called a
[BroadcastChannel]. There is a variety of subjects in Rx with
[BehaviorSubject](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/subjects/BehaviorSubject.html) being the
-most useful one to manage state:
+the one used to manage state:
<!--- INCLUDE
import io.reactivex.subjects.BehaviorSubject
@@ -528,6 +528,12 @@
four
```
+Another implementation of [BroadcastChannel] is [ArrayBroadcastChannel]. It delivers every event to every
+subscriber since the moment the corresponding subscription is open. It corresponds to
+[PublishSubject][http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/subjects/PublishSubject.html] in Rx.
+The capacity of the buffer in the constructor of `ArrayBroadcastChannel` controls the numbers of elements
+that can be sent before the sender is suspended waiting for receiver to receive those elements.
+
<!--- TEST -->
## Operators
@@ -1056,6 +1062,7 @@
[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/send.html
[BroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-broadcast-channel/index.html
[ConflatedBroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-conflated-broadcast-channel/index.html
+[ArrayBroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-array-broadcast-channel/index.html
<!--- INDEX kotlinx.coroutines.experimental.selects -->
[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/select.html
[whileSelect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/while-select.html