Migrate channels and related operators to common, so channels can be used from JS
Fixes #201
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.common.kt
index e2e67d3..6759e8f 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.common.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.common.kt
@@ -18,7 +18,7 @@
public expect class CompletionHandlerException(message: String, cause: Throwable) : RuntimeException
-public expect open class CancellationException(message: String) : IllegalStateException
+public expect open class CancellationException(message: String?) : IllegalStateException
public expect class JobCancellationException(
message: String,
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
new file mode 100644
index 0000000..5fcfc87
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
@@ -0,0 +1,1021 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.internal.*
+import kotlinx.coroutines.experimental.internalAnnotations.*
+import kotlinx.coroutines.experimental.intrinsics.*
+import kotlinx.coroutines.experimental.selects.*
+import kotlin.coroutines.experimental.*
+
+/**
+ * Abstract send channel. It is a base class for all send channel implementations.
+ */
+public abstract class AbstractSendChannel<E> : SendChannel<E> {
+ /** @suppress **This is unstable API and it is subject to change.** */
+ protected val queue = LockFreeLinkedListHead()
+
+ // ------ extension points for buffered channels ------
+
+ /**
+ * Returns `true` if [isBufferFull] is always `true`.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected abstract val isBufferAlwaysFull: Boolean
+
+ /**
+ * Returns `true` if this channel's buffer is full.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected abstract val isBufferFull: Boolean
+
+ // ------ internal functions for override by buffered channels ------
+
+ /**
+ * Tries to add element to buffer or to queued receiver.
+ * Return type is `OFFER_SUCCESS | OFFER_FAILED | Closed`.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected open fun offerInternal(element: E): Any {
+ while (true) {
+ val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED
+ val token = receive.tryResumeReceive(element, idempotent = null)
+ if (token != null) {
+ receive.completeResumeReceive(token)
+ return receive.offerResult
+ }
+ }
+ }
+
+ /**
+ * Tries to add element to buffer or to queued receiver if select statement clause was not selected yet.
+ * Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected open fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
+ // offer atomically with select
+ val offerOp = describeTryOffer(element)
+ val failure = select.performAtomicTrySelect(offerOp)
+ if (failure != null) return failure
+ val receive = offerOp.result
+ receive.completeResumeReceive(offerOp.resumeToken!!)
+ return receive.offerResult
+ }
+
+ // ------ state functions & helpers for concrete implementations ------
+
+ /**
+ * Returns non-null closed token if it is last in the queue.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected val closedForSend: Closed<*>? get() = queue.prev as? Closed<*>
+
+ /**
+ * Returns non-null closed token if it is first in the queue.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected val closedForReceive: Closed<*>? get() = queue.next as? Closed<*>
+
+ /**
+ * Retrieves first sending waiter from the queue or returns closed token.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected fun takeFirstSendOrPeekClosed(): Send? =
+ queue.removeFirstIfIsInstanceOfOrPeekIf<Send> { it is Closed<*> }
+
+ /**
+ * Queues buffered element, returns null on success or
+ * returns node reference if it was already closed or is waiting for receive.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected fun sendBuffered(element: E): ReceiveOrClosed<*>? {
+ queue.addLastIfPrev(SendBuffered(element), { prev ->
+ if (prev is ReceiveOrClosed<*>) return@sendBuffered prev
+ true
+ })
+ return null
+ }
+
+ /**
+ * Queues conflated element, returns null on success or
+ * returns node reference if it was already closed or is waiting for receive.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected fun sendConflated(element: E): ReceiveOrClosed<*>? {
+ val node = SendBuffered(element)
+ queue.addLastIfPrev(node, { prev ->
+ if (prev is ReceiveOrClosed<*>) return@sendConflated prev
+ true
+ })
+ conflatePreviousSendBuffered(node)
+ return null
+ }
+
+ /**
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected fun conflatePreviousSendBuffered(node: LockFreeLinkedListNode) {
+ val prev = node.prev
+ (prev as? SendBuffered<*>)?.remove()
+ }
+
+ /**
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected fun describeSendBuffered(element: E): AddLastDesc<*> = SendBufferedDesc(queue, element)
+
+ private open class SendBufferedDesc<E>(
+ queue: LockFreeLinkedListHead,
+ element: E
+ ) : AddLastDesc<SendBuffered<E>>(queue, SendBuffered(element)) {
+ override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
+ if (affected is ReceiveOrClosed<*>) return OFFER_FAILED
+ return null
+ }
+ }
+
+ /**
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected fun describeSendConflated(element: E): AddLastDesc<*> = SendConflatedDesc(queue, element)
+
+ private class SendConflatedDesc<E>(
+ queue: LockFreeLinkedListHead,
+ element: E
+ ) : SendBufferedDesc<E>(queue, element) {
+ override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
+ super.finishOnSuccess(affected, next)
+ // remove previous SendBuffered
+ (affected as? SendBuffered<*>)?.remove()
+ }
+ }
+
+ // ------ SendChannel ------
+
+ public final override val isClosedForSend: Boolean get() = closedForSend != null
+ public final override val isFull: Boolean get() = queue.next !is ReceiveOrClosed<*> && isBufferFull
+
+ public final override suspend fun send(element: E) {
+ // fast path -- try offer non-blocking
+ if (offer(element)) return
+ // slow-path does suspend
+ return sendSuspend(element)
+ }
+
+ public final override fun offer(element: E): Boolean {
+ val result = offerInternal(element)
+ return when {
+ result === OFFER_SUCCESS -> true
+ result === OFFER_FAILED -> false
+ result is Closed<*> -> throw result.sendException
+ else -> error("offerInternal returned $result")
+ }
+ }
+
+ private suspend fun sendSuspend(element: E): Unit = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
+ val send = SendElement(element, cont)
+ loop@ while (true) {
+ val enqueueResult = enqueueSend(send)
+ when (enqueueResult) {
+ null -> { // enqueued successfully
+ cont.initCancellability() // make it properly cancellable
+ cont.removeOnCancel(send)
+ return@sc
+ }
+ is Closed<*> -> {
+ cont.resumeWithException(enqueueResult.sendException)
+ return@sc
+ }
+ }
+ // hm... receiver is waiting or buffer is not full. try to offer
+ val offerResult = offerInternal(element)
+ when {
+ offerResult === OFFER_SUCCESS -> {
+ cont.resume(Unit)
+ return@sc
+ }
+ offerResult === OFFER_FAILED -> continue@loop
+ offerResult is Closed<*> -> {
+ cont.resumeWithException(offerResult.sendException)
+ return@sc
+ }
+ else -> error("offerInternal returned $offerResult")
+ }
+ }
+ }
+
+ /**
+ * Result is:
+ * * null -- successfully enqueued
+ * * ENQUEUE_FAILED -- buffer is not full (should not enqueue)
+ * * ReceiveOrClosed<*> -- receiver is waiting or it is closed (should not enqueue)
+ */
+ private fun enqueueSend(send: SendElement): Any? {
+ if (isBufferAlwaysFull) {
+ queue.addLastIfPrev(send, { prev ->
+ if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
+ true
+ })
+ } else {
+ if (!queue.addLastIfPrevAndIf(send, { prev ->
+ if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
+ true
+ }, { isBufferFull }))
+ return ENQUEUE_FAILED
+ }
+ return null
+ }
+
+ public override fun close(cause: Throwable?): Boolean {
+ val closed = Closed<E>(cause)
+ while (true) {
+ val receive = takeFirstReceiveOrPeekClosed()
+ if (receive == null) {
+ // queue empty or has only senders -- try add last "Closed" item to the queue
+ if (queue.addLastIfPrev(closed, { prev ->
+ if (prev is Closed<*>) return false // already closed
+ prev !is ReceiveOrClosed<*> // only add close if no waiting receive
+ })) {
+ onClosed(closed)
+ afterClose(cause)
+ return true
+ }
+ continue // retry on failure
+ }
+ if (receive is Closed<*>) return false // already marked as closed -- nothing to do
+ receive as Receive<E> // type assertion
+ receive.resumeReceiveClosed(closed)
+ }
+ }
+
+ /**
+ * Invoked when [Closed] element was just added.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected open fun onClosed(closed: Closed<E>) {}
+
+ /**
+ * Invoked after successful [close].
+ */
+ protected open fun afterClose(cause: Throwable?) {}
+
+ /**
+ * Retrieves first receiving waiter from the queue or returns closed token.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected open fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
+ queue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed<E>>({ it is Closed<*> })
+
+ // ------ registerSelectSend ------
+
+ /**
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected fun describeTryOffer(element: E): TryOfferDesc<E> = TryOfferDesc(element, queue)
+
+ /**
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected class TryOfferDesc<E>(
+ @JvmField val element: E,
+ queue: LockFreeLinkedListHead
+ ) : RemoveFirstDesc<ReceiveOrClosed<E>>(queue) {
+ @JvmField var resumeToken: Any? = null
+
+ override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
+ if (affected !is ReceiveOrClosed<*>) return OFFER_FAILED
+ if (affected is Closed<*>) return affected
+ return null
+ }
+
+ override fun validatePrepared(node: ReceiveOrClosed<E>): Boolean {
+ val token = node.tryResumeReceive(element, idempotent = this) ?: return false
+ resumeToken = token
+ return true
+ }
+ }
+
+ private inner class TryEnqueueSendDesc<R>(
+ element: E,
+ select: SelectInstance<R>,
+ block: suspend (SendChannel<E>) -> R
+ ) : AddLastDesc<SendSelect<E, R>>(queue, SendSelect(element, this@AbstractSendChannel, select, block)) {
+ override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
+ if (affected is ReceiveOrClosed<*>) {
+ return affected as? Closed<*> ?: ENQUEUE_FAILED
+ }
+ return null
+ }
+
+ override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
+ if (!isBufferFull) return ENQUEUE_FAILED
+ return super.onPrepare(affected, next)
+ }
+
+ override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
+ super.finishOnSuccess(affected, next)
+ // we can actually remove on select start, but this is also Ok (it'll get removed if discovered there)
+ node.disposeOnSelect()
+ }
+ }
+
+ final override val onSend: SelectClause2<E, SendChannel<E>>
+ get() = object : SelectClause2<E, SendChannel<E>> {
+ override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
+ registerSelectSend(select, param, block)
+ }
+ }
+
+ private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
+ while (true) {
+ if (select.isSelected) return
+ if (isFull) {
+ val enqueueOp = TryEnqueueSendDesc(element, select, block)
+ val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
+ when {
+ enqueueResult === ALREADY_SELECTED -> return
+ enqueueResult === ENQUEUE_FAILED -> {} // retry
+ enqueueResult is Closed<*> -> throw enqueueResult.sendException
+ else -> error("performAtomicIfNotSelected(TryEnqueueSendDesc) returned $enqueueResult")
+ }
+ } else {
+ val offerResult = offerSelectInternal(element, select)
+ when {
+ offerResult === ALREADY_SELECTED -> return
+ offerResult === OFFER_FAILED -> {} // retry
+ offerResult === OFFER_SUCCESS -> {
+ block.startCoroutineUndispatched(receiver = this, completion = select.completion)
+ return
+ }
+ offerResult is Closed<*> -> throw offerResult.sendException
+ else -> error("offerSelectInternal returned $offerResult")
+ }
+ }
+ }
+ }
+
+ // ------ debug ------
+
+ public override fun toString() =
+ "$classSimpleName@$hexAddress{$queueDebugStateString}$bufferDebugString"
+
+ private val queueDebugStateString: String
+ get() {
+ val head = queue.next
+ if (head === queue) return "EmptyQueue"
+ var result = when (head) {
+ is Closed<*> -> head.toString()
+ is Receive<*> -> "ReceiveQueued"
+ is Send -> "SendQueued"
+ else -> "UNEXPECTED:$head" // should not happen
+ }
+ val tail = queue.prev
+ if (tail !== head) {
+ result += ",queueSize=${countQueueSize()}"
+ if (tail is Closed<*>) result += ",closedForSend=$tail"
+ }
+ return result
+ }
+
+ private fun countQueueSize(): Int {
+ var size = 0
+ queue.forEach<LockFreeLinkedListNode> { size++ }
+ return size
+ }
+
+ protected open val bufferDebugString: String get() = ""
+
+ // ------ private ------
+
+ private class SendSelect<E, R>(
+ override val pollResult: Any?,
+ @JvmField val channel: SendChannel<E>,
+ @JvmField val select: SelectInstance<R>,
+ @JvmField val block: suspend (SendChannel<E>) -> R
+ ) : LockFreeLinkedListNode(), Send, DisposableHandle {
+ override fun tryResumeSend(idempotent: Any?): Any? =
+ if (select.trySelect(idempotent)) SELECT_STARTED else null
+
+ override fun completeResumeSend(token: Any) {
+ check(token === SELECT_STARTED)
+ block.startCoroutine(receiver = channel, completion = select.completion)
+ }
+
+ fun disposeOnSelect() {
+ select.disposeOnSelect(this)
+ }
+
+ override fun dispose() {
+ remove()
+ }
+
+ override fun resumeSendClosed(closed: Closed<*>) {
+ if (select.trySelect(null))
+ select.resumeSelectCancellableWithException(closed.sendException)
+ }
+
+ override fun toString(): String = "SendSelect($pollResult)[$channel, $select]"
+ }
+
+ private class SendBuffered<out E>(
+ @JvmField val element: E
+ ) : LockFreeLinkedListNode(), Send {
+ override val pollResult: Any? get() = element
+ override fun tryResumeSend(idempotent: Any?): Any? = SEND_RESUMED
+ override fun completeResumeSend(token: Any) { check(token === SEND_RESUMED) }
+ override fun resumeSendClosed(closed: Closed<*>) {}
+ }
+}
+
+/**
+ * Abstract send/receive channel. It is a base class for all channel implementations.
+ */
+public abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E> {
+ // ------ extension points for buffered channels ------
+
+ /**
+ * Returns `true` if [isBufferEmpty] is always `true`.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected abstract val isBufferAlwaysEmpty: Boolean
+
+ /**
+ * Returns `true` if this channel's buffer is empty.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected abstract val isBufferEmpty: Boolean
+
+ // ------ internal functions for override by buffered channels ------
+
+ /**
+ * Tries to remove element from buffer or from queued sender.
+ * Return type is `E | POLL_FAILED | Closed`
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected open fun pollInternal(): Any? {
+ while (true) {
+ val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED
+ val token = send.tryResumeSend(idempotent = null)
+ if (token != null) {
+ send.completeResumeSend(token)
+ return send.pollResult
+ }
+ }
+ }
+
+ /**
+ * Tries to remove element from buffer or from queued sender if select statement clause was not selected yet.
+ * Return type is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected open fun pollSelectInternal(select: SelectInstance<*>): Any? {
+ // poll atomically with select
+ val pollOp = describeTryPoll()
+ val failure = select.performAtomicTrySelect(pollOp)
+ if (failure != null) return failure
+ val send = pollOp.result
+ send.completeResumeSend(pollOp.resumeToken!!)
+ return pollOp.pollResult
+ }
+
+ // ------ state functions & helpers for concrete implementations ------
+
+ /**
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected val hasReceiveOrClosed: Boolean get() = queue.next is ReceiveOrClosed<*>
+
+ // ------ ReceiveChannel ------
+
+ public final override val isClosedForReceive: Boolean get() = closedForReceive != null && isBufferEmpty
+ public final override val isEmpty: Boolean get() = queue.next !is Send && isBufferEmpty
+
+ @Suppress("UNCHECKED_CAST")
+ public final override suspend fun receive(): E {
+ // fast path -- try poll non-blocking
+ val result = pollInternal()
+ if (result !== POLL_FAILED) return receiveResult(result)
+ // slow-path does suspend
+ return receiveSuspend()
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ private fun receiveResult(result: Any?): E {
+ if (result is Closed<*>) throw result.receiveException
+ return result as E
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ private suspend fun receiveSuspend(): E = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
+ val receive = ReceiveElement(cont as CancellableContinuation<E?>, nullOnClose = false)
+ while (true) {
+ if (enqueueReceive(receive)) {
+ cont.initCancellability() // make it properly cancellable
+ removeReceiveOnCancel(cont, receive)
+ return@sc
+ }
+ // hm... something is not right. try to poll
+ val result = pollInternal()
+ if (result is Closed<*>) {
+ cont.resumeWithException(result.receiveException)
+ return@sc
+ }
+ if (result !== POLL_FAILED) {
+ cont.resume(result as E)
+ return@sc
+ }
+ }
+ }
+
+ private fun enqueueReceive(receive: Receive<E>): Boolean {
+ val result = if (isBufferAlwaysEmpty)
+ queue.addLastIfPrev(receive, { it !is Send }) else
+ queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty })
+ if (result) onReceiveEnqueued()
+ return result
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ public final override suspend fun receiveOrNull(): E? {
+ // fast path -- try poll non-blocking
+ val result = pollInternal()
+ if (result !== POLL_FAILED) return receiveOrNullResult(result)
+ // slow-path does suspend
+ return receiveOrNullSuspend()
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ private fun receiveOrNullResult(result: Any?): E? {
+ if (result is Closed<*>) {
+ if (result.closeCause != null) throw result.closeCause
+ return null
+ }
+ return result as E
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ private suspend fun receiveOrNullSuspend(): E? = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
+ val receive = ReceiveElement(cont, nullOnClose = true)
+ while (true) {
+ if (enqueueReceive(receive)) {
+ cont.initCancellability() // make it properly cancellable
+ removeReceiveOnCancel(cont, receive)
+ return@sc
+ }
+ // hm... something is not right. try to poll
+ val result = pollInternal()
+ if (result is Closed<*>) {
+ if (result.closeCause == null)
+ cont.resume(null)
+ else
+ cont.resumeWithException(result.closeCause)
+ return@sc
+ }
+ if (result !== POLL_FAILED) {
+ cont.resume(result as E)
+ return@sc
+ }
+ }
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ public final override fun poll(): E? {
+ val result = pollInternal()
+ return if (result === POLL_FAILED) null else receiveOrNullResult(result)
+ }
+
+ override fun cancel(cause: Throwable?): Boolean =
+ close(cause).also {
+ cleanupSendQueueOnCancel()
+ }
+
+ // Note: this function is invoked when channel is already closed
+ protected open fun cleanupSendQueueOnCancel() {
+ val closed = closedForSend ?: error("Cannot happen")
+ while (true) {
+ val send = takeFirstSendOrPeekClosed() ?: error("Cannot happen")
+ if (send is Closed<*>) {
+ check(send === closed)
+ return // cleaned
+ }
+ send.resumeSendClosed(closed)
+ }
+ }
+
+ public final override fun iterator(): ChannelIterator<E> = Itr(this)
+
+ // ------ registerSelectReceive ------
+
+ /**
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected fun describeTryPoll(): TryPollDesc<E> = TryPollDesc(queue)
+
+ /**
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected class TryPollDesc<E>(queue: LockFreeLinkedListHead) : RemoveFirstDesc<Send>(queue) {
+ @JvmField var resumeToken: Any? = null
+ @JvmField var pollResult: E? = null
+
+ override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
+ if (affected is Closed<*>) return affected
+ if (affected !is Send) return POLL_FAILED
+ return null
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ override fun validatePrepared(node: Send): Boolean {
+ val token = node.tryResumeSend(idempotent = this) ?: return false
+ resumeToken = token
+ pollResult = node.pollResult as E
+ return true
+ }
+ }
+
+ private inner class TryEnqueueReceiveDesc<E, R>(
+ select: SelectInstance<R>,
+ block: suspend (E?) -> R,
+ nullOnClose: Boolean
+ ) : AddLastDesc<ReceiveSelect<R, E>>(queue, ReceiveSelect(select, block, nullOnClose)) {
+ override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
+ if (affected is Send) return ENQUEUE_FAILED
+ return null
+ }
+
+ override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
+ if (!isBufferEmpty) return ENQUEUE_FAILED
+ return super.onPrepare(affected, next)
+ }
+
+ override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
+ super.finishOnSuccess(affected, next)
+ // notify the there is one more receiver
+ onReceiveEnqueued()
+ // we can actually remove on select start, but this is also Ok (it'll get removed if discovered there)
+ node.removeOnSelectCompletion()
+ }
+ }
+
+ final override val onReceive: SelectClause1<E>
+ get() = object : SelectClause1<E> {
+ override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E) -> R) {
+ registerSelectReceive(select, block)
+ }
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ private fun <R> registerSelectReceive(select: SelectInstance<R>, block: suspend (E) -> R) {
+ while (true) {
+ if (select.isSelected) return
+ if (isEmpty) {
+ val enqueueOp = TryEnqueueReceiveDesc(select, block as (suspend (E?) -> R), nullOnClose = false)
+ val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
+ when {
+ enqueueResult === ALREADY_SELECTED -> return
+ enqueueResult === ENQUEUE_FAILED -> {} // retry
+ else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
+ }
+ } else {
+ val pollResult = pollSelectInternal(select)
+ when {
+ pollResult === ALREADY_SELECTED -> return
+ pollResult === POLL_FAILED -> {} // retry
+ pollResult is Closed<*> -> throw pollResult.receiveException
+ else -> {
+ block.startCoroutineUndispatched(pollResult as E, select.completion)
+ return
+ }
+ }
+ }
+ }
+ }
+
+ final override val onReceiveOrNull: SelectClause1<E?>
+ get() = object : SelectClause1<E?> {
+ override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E?) -> R) {
+ registerSelectReceiveOrNull(select, block)
+ }
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ private fun <R> registerSelectReceiveOrNull(select: SelectInstance<R>, block: suspend (E?) -> R) {
+ while (true) {
+ if (select.isSelected) return
+ if (isEmpty) {
+ val enqueueOp = TryEnqueueReceiveDesc(select, block, nullOnClose = true)
+ val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
+ when {
+ enqueueResult === ALREADY_SELECTED -> return
+ enqueueResult === ENQUEUE_FAILED -> {} // retry
+ else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
+ }
+ } else {
+ val pollResult = pollSelectInternal(select)
+ when {
+ pollResult === ALREADY_SELECTED -> return
+ pollResult === POLL_FAILED -> {} // retry
+ pollResult is Closed<*> -> {
+ if (pollResult.closeCause == null) {
+ if (select.trySelect(null))
+ block.startCoroutineUndispatched(null, select.completion)
+ return
+ } else
+ throw pollResult.closeCause
+ }
+ else -> {
+ // selected successfully
+ block.startCoroutineUndispatched(pollResult as E, select.completion)
+ return
+ }
+ }
+ }
+ }
+ }
+
+ // ------ protected ------
+
+ override fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
+ super.takeFirstReceiveOrPeekClosed().also {
+ if (it != null && it !is Closed<*>) onReceiveDequeued()
+ }
+
+ /**
+ * Invoked when receiver is successfully enqueued to the queue of waiting receivers.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected open fun onReceiveEnqueued() {}
+
+ /**
+ * Invoked when enqueued receiver was successfully removed from the queue of waiting receivers.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ protected open fun onReceiveDequeued() {}
+
+ // ------ private ------
+
+ private fun removeReceiveOnCancel(cont: CancellableContinuation<*>, receive: Receive<*>) {
+ cont.invokeOnCompletion {
+ if (cont.isCancelled && receive.remove())
+ onReceiveDequeued()
+ }
+ }
+
+ private class Itr<E>(val channel: AbstractChannel<E>) : ChannelIterator<E> {
+ var result: Any? = POLL_FAILED // E | POLL_FAILED | Closed
+
+ override suspend fun hasNext(): Boolean {
+ // check for repeated hasNext
+ if (result !== POLL_FAILED) return hasNextResult(result)
+ // fast path -- try poll non-blocking
+ result = channel.pollInternal()
+ if (result !== POLL_FAILED) return hasNextResult(result)
+ // slow-path does suspend
+ return hasNextSuspend()
+ }
+
+ private fun hasNextResult(result: Any?): Boolean {
+ if (result is Closed<*>) {
+ if (result.closeCause != null) throw result.receiveException
+ return false
+ }
+ return true
+ }
+
+ private suspend fun hasNextSuspend(): Boolean = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
+ val receive = ReceiveHasNext(this, cont)
+ while (true) {
+ if (channel.enqueueReceive(receive)) {
+ cont.initCancellability() // make it properly cancellable
+ channel.removeReceiveOnCancel(cont, receive)
+ return@sc
+ }
+ // hm... something is not right. try to poll
+ val result = channel.pollInternal()
+ this.result = result
+ if (result is Closed<*>) {
+ if (result.closeCause == null)
+ cont.resume(false)
+ else
+ cont.resumeWithException(result.receiveException)
+ return@sc
+ }
+ if (result !== POLL_FAILED) {
+ cont.resume(true)
+ return@sc
+ }
+ }
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ override suspend fun next(): E {
+ val result = this.result
+ if (result is Closed<*>) throw result.receiveException
+ if (result !== POLL_FAILED) {
+ this.result = POLL_FAILED
+ return result as E
+ }
+ // rare case when hasNext was not invoked yet -- just delegate to receive (leave state as is)
+ return channel.receive()
+ }
+ }
+
+ private class ReceiveElement<in E>(
+ @JvmField val cont: CancellableContinuation<E?>,
+ @JvmField val nullOnClose: Boolean
+ ) : Receive<E>() {
+ override fun tryResumeReceive(value: E, idempotent: Any?): Any? = cont.tryResume(value, idempotent)
+ override fun completeResumeReceive(token: Any) = cont.completeResume(token)
+ override fun resumeReceiveClosed(closed: Closed<*>) {
+ if (closed.closeCause == null && nullOnClose)
+ cont.resume(null)
+ else
+ cont.resumeWithException(closed.receiveException)
+ }
+ override fun toString(): String = "ReceiveElement[$cont,nullOnClose=$nullOnClose]"
+ }
+
+ private class ReceiveHasNext<E>(
+ @JvmField val iterator: Itr<E>,
+ @JvmField val cont: CancellableContinuation<Boolean>
+ ) : Receive<E>() {
+ override fun tryResumeReceive(value: E, idempotent: Any?): Any? {
+ val token = cont.tryResume(true, idempotent)
+ if (token != null) {
+ /*
+ When idempotent != null this invocation can be stale and we cannot directly update iterator.result
+ Instead, we save both token & result into a temporary IdempotentTokenValue object and
+ set iterator result only in completeResumeReceive that is going to be invoked just once
+ */
+ if (idempotent != null) return IdempotentTokenValue(token, value)
+ iterator.result = value
+ }
+ return token
+ }
+
+ override fun completeResumeReceive(token: Any) {
+ if (token is IdempotentTokenValue<*>) {
+ iterator.result = token.value
+ cont.completeResume(token.token)
+ } else
+ cont.completeResume(token)
+ }
+
+ override fun resumeReceiveClosed(closed: Closed<*>) {
+ val token = if (closed.closeCause == null)
+ cont.tryResume(false)
+ else
+ cont.tryResumeWithException(closed.receiveException)
+ if (token != null) {
+ iterator.result = closed
+ cont.completeResume(token)
+ }
+ }
+ override fun toString(): String = "ReceiveHasNext[$cont]"
+ }
+
+ private inner class ReceiveSelect<R, in E>(
+ @JvmField val select: SelectInstance<R>,
+ @JvmField val block: suspend (E?) -> R,
+ @JvmField val nullOnClose: Boolean
+ ) : Receive<E>(), DisposableHandle {
+ override fun tryResumeReceive(value: E, idempotent: Any?): Any? =
+ if (select.trySelect(idempotent)) (value ?: NULL_VALUE) else null
+
+ @Suppress("UNCHECKED_CAST")
+ override fun completeResumeReceive(token: Any) {
+ val value: E = (if (token === NULL_VALUE) null else token) as E
+ block.startCoroutine(value, select.completion)
+ }
+
+ override fun resumeReceiveClosed(closed: Closed<*>) {
+ if (select.trySelect(null)) {
+ if (closed.closeCause == null && nullOnClose) {
+ block.startCoroutine(null, select.completion)
+ } else {
+ // even though we are dispatching coroutine to process channel close on receive,
+ // which is an atomically cancellable suspending function,
+ // close is a final state, so we can use a cancellable resume mode
+ select.resumeSelectCancellableWithException(closed.receiveException)
+ }
+ }
+ }
+
+ fun removeOnSelectCompletion() {
+ select.disposeOnSelect(this)
+ }
+
+ override fun dispose() { // invoked on select completion
+ if (remove())
+ onReceiveDequeued() // notify cancellation of receive
+ }
+
+ override fun toString(): String = "ReceiveSelect[$select,nullOnClose=$nullOnClose]"
+ }
+
+ private class IdempotentTokenValue<out E>(
+ @JvmField val token: Any,
+ @JvmField val value: E
+ )
+}
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val OFFER_SUCCESS: Any = Symbol("OFFER_SUCCESS")
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val OFFER_FAILED: Any = Symbol("OFFER_FAILED")
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val POLL_FAILED: Any = Symbol("POLL_FAILED")
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val SELECT_STARTED: Any = Symbol("SELECT_STARTED")
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val NULL_VALUE: Any = Symbol("NULL_VALUE")
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val CLOSE_RESUMED: Any = Symbol("CLOSE_RESUMED")
+
+/** @suppress **This is unstable API and it is subject to change.** */
+@JvmField val SEND_RESUMED = Symbol("SEND_RESUMED")
+
+/**
+ * Represents sending waiter in the queue.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+public interface Send {
+ val pollResult: Any? // E | Closed
+ fun tryResumeSend(idempotent: Any?): Any?
+ fun completeResumeSend(token: Any)
+ fun resumeSendClosed(closed: Closed<*>)
+}
+
+/**
+ * Represents receiver waiter in the queue or closed token.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+public interface ReceiveOrClosed<in E> {
+ val offerResult: Any // OFFER_SUCCESS | Closed
+ fun tryResumeReceive(value: E, idempotent: Any?): Any?
+ fun completeResumeReceive(token: Any)
+}
+
+/**
+ * Represents sender for a specific element.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+@Suppress("UNCHECKED_CAST")
+public class SendElement(
+ override val pollResult: Any?,
+ @JvmField val cont: CancellableContinuation<Unit>
+) : LockFreeLinkedListNode(), Send {
+ override fun tryResumeSend(idempotent: Any?): Any? = cont.tryResume(Unit, idempotent)
+ override fun completeResumeSend(token: Any) = cont.completeResume(token)
+ override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
+ override fun toString(): String = "SendElement($pollResult)[$cont]"
+}
+
+/**
+ * Represents closed channel.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+public class Closed<in E>(
+ @JvmField val closeCause: Throwable?
+) : LockFreeLinkedListNode(), Send, ReceiveOrClosed<E> {
+ val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
+ val receiveException: Throwable get() = closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
+
+ override val offerResult get() = this
+ override val pollResult get() = this
+ override fun tryResumeSend(idempotent: Any?): Any? = CLOSE_RESUMED
+ override fun completeResumeSend(token: Any) { check(token === CLOSE_RESUMED) }
+ override fun tryResumeReceive(value: E, idempotent: Any?): Any? = CLOSE_RESUMED
+ override fun completeResumeReceive(token: Any) { check(token === CLOSE_RESUMED) }
+ override fun resumeSendClosed(closed: Closed<*>) = error("Should be never invoked")
+ override fun toString(): String = "Closed[$closeCause]"
+}
+
+private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
+ override val offerResult get() = OFFER_SUCCESS
+ abstract fun resumeReceiveClosed(closed: Closed<*>)
+}
+
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt
new file mode 100644
index 0000000..ffdf694
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt
@@ -0,0 +1,359 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.internal.*
+import kotlinx.coroutines.experimental.internalAnnotations.*
+import kotlinx.coroutines.experimental.selects.*
+
+/**
+ * Broadcast channel with array buffer of a fixed [capacity].
+ * Sender suspends only when buffer is full due to one of the receives being slow to consume and
+ * receiver suspends only when buffer is empty.
+ *
+ * Note, that elements that are sent to the broadcast channel while there are no [openSubscription] subscribers are immediately
+ * lost.
+ *
+ * This channel is created by `BroadcastChannel(capacity)` factory function invocation.
+ *
+ * This implementation uses lock to protect the buffer, which is held only during very short buffer-update operations.
+ * The lock at each subscription is also used to manage concurrent attempts to receive from the same subscriber.
+ * The lists of suspended senders or receivers are lock-free.
+ */
+class ArrayBroadcastChannel<E>(
+ /**
+ * Buffer capacity.
+ */
+ val capacity: Int
+) : AbstractSendChannel<E>(), BroadcastChannel<E> {
+ init {
+ require(capacity >= 1) { "ArrayBroadcastChannel capacity must be at least 1, but $capacity was specified" }
+ }
+
+ private val bufferLock = ReentrantLock()
+ private val buffer = arrayOfNulls<Any?>(capacity) // guarded by bufferLock
+
+ // head & tail are Long (64 bits) and we assume that they never wrap around
+ // head, tail, and size are guarded by bufferLock
+ @Volatile
+ private var head: Long = 0 // do modulo on use of head
+ @Volatile
+ private var tail: Long = 0 // do modulo on use of tail
+ @Volatile
+ private var size: Int = 0
+
+ /*
+ Writes to buffer are guarded by bufferLock, but reads from buffer are concurrent with writes
+ - Write element to buffer then write "tail" (volatile)
+ - Read "tail" (volatile), then read element from buffer
+ So read/writes to buffer need not be volatile
+ */
+
+ private val subs = subscriberList<Subscriber<E>>()
+
+ override val isBufferAlwaysFull: Boolean get() = false
+ override val isBufferFull: Boolean get() = size >= capacity
+
+ override fun openSubscription(): SubscriptionReceiveChannel<E> =
+ Subscriber(this).also {
+ updateHead(addSub = it)
+ }
+
+ override fun close(cause: Throwable?): Boolean {
+ if (!super.close(cause)) return false
+ checkSubOffers()
+ return true
+ }
+
+ // result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
+ override fun offerInternal(element: E): Any {
+ bufferLock.withLock {
+ // check if closed for send (under lock, so size cannot change)
+ closedForSend?.let { return it }
+ val size = this.size
+ if (size >= capacity) return OFFER_FAILED
+ val tail = this.tail
+ buffer[(tail % capacity).toInt()] = element
+ this.size = size + 1
+ this.tail = tail + 1
+ }
+ // if offered successfully, then check subs outside of lock
+ checkSubOffers()
+ return OFFER_SUCCESS
+ }
+
+ // result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`
+ override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
+ bufferLock.withLock {
+ // check if closed for send (under lock, so size cannot change)
+ closedForSend?.let { return it }
+ val size = this.size
+ if (size >= capacity) return OFFER_FAILED
+ // let's try to select sending this element to buffer
+ if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
+ return ALREADY_SELECTED
+ }
+ val tail = this.tail
+ buffer[(tail % capacity).toInt()] = element
+ this.size = size + 1
+ this.tail = tail + 1
+ }
+ // if offered successfully, then check subs outside of lock
+ checkSubOffers()
+ return OFFER_SUCCESS
+ }
+
+ private fun checkSubOffers() {
+ var updated = false
+ var hasSubs = false
+ @Suppress("LoopToCallChain") // must invoke `checkOffer` on every sub
+ for (sub in subs) {
+ hasSubs = true
+ if (sub.checkOffer()) updated = true
+ }
+ if (updated || !hasSubs)
+ updateHead()
+ }
+
+ // updates head if needed and optionally adds / removes subscriber under the same lock
+ private tailrec fun updateHead(addSub: Subscriber<E>? = null, removeSub: Subscriber<E>? = null) {
+ // update head in a tail rec loop
+ var send: Send? = null
+ var token: Any? = null
+ bufferLock.withLock {
+ if (addSub != null) {
+ addSub.subHead = tail // start from last element
+ val wasEmpty = subs.isEmpty()
+ subs.add(addSub)
+ if (!wasEmpty) return // no need to update when adding second and etc sub
+ }
+ if (removeSub != null) {
+ subs.remove(removeSub)
+ if (head != removeSub.subHead) return // no need to update
+ }
+ val minHead = computeMinHead()
+ val tail = this.tail
+ var head = this.head
+ val targetHead = minHead.coerceAtMost(tail)
+ if (targetHead <= head) return // nothing to do -- head was already moved
+ var size = this.size
+ // clean up removed (on not need if we don't have any subscribers anymore)
+ while (head < targetHead) {
+ buffer[(head % capacity).toInt()] = null
+ val wasFull = size >= capacity
+ // update the size before checking queue (no more senders can queue up)
+ this.head = ++head
+ this.size = --size
+ if (wasFull) {
+ while (true) {
+ send = takeFirstSendOrPeekClosed() ?: break // when when no sender
+ if (send is Closed<*>) break // break when closed for send
+ token = send!!.tryResumeSend(idempotent = null)
+ if (token != null) {
+ // put sent element to the buffer
+ buffer[(tail % capacity).toInt()] = (send as Send).pollResult
+ this.size = size + 1
+ this.tail = tail + 1
+ return@withLock // go out of lock to wakeup this sender
+ }
+ }
+ }
+ }
+ return // done updating here -> return
+ }
+ // we only get out of the lock normally when there is a sender to resume
+ send!!.completeResumeSend(token!!)
+ // since we've just sent an element, we might need to resume some receivers
+ checkSubOffers()
+ // tailrec call to recheck
+ updateHead()
+ }
+
+ private fun computeMinHead(): Long {
+ var minHead = Long.MAX_VALUE
+ for (sub in subs)
+ minHead = minHead.coerceAtMost(sub.subHead) // volatile (atomic) reads of subHead
+ return minHead
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ private fun elementAt(index: Long): E = buffer[(index % capacity).toInt()] as E
+
+ private class Subscriber<E>(
+ private val broadcastChannel: ArrayBroadcastChannel<E>
+ ) : AbstractChannel<E>(), SubscriptionReceiveChannel<E> {
+ private val subLock = ReentrantLock()
+
+ @Volatile
+ @JvmField
+ var subHead: Long = 0 // guarded by subLock
+
+ override val isBufferAlwaysEmpty: Boolean get() = false
+ override val isBufferEmpty: Boolean get() = subHead >= broadcastChannel.tail
+ override val isBufferAlwaysFull: Boolean get() = error("Should not be used")
+ override val isBufferFull: Boolean get() = error("Should not be used")
+
+ override fun cancel(cause: Throwable?): Boolean =
+ close(cause).also { closed ->
+ if (closed) broadcastChannel.updateHead(removeSub = this)
+ }
+
+ // returns true if subHead was updated and broadcast channel's head must be checked
+ // this method is lock-free (it never waits on lock)
+ @Suppress("UNCHECKED_CAST")
+ fun checkOffer(): Boolean {
+ var updated = false
+ var closed: Closed<*>? = null
+ loop@
+ while (needsToCheckOfferWithoutLock()) {
+ // just use `tryLock` here and break when some other thread is checking under lock
+ // it means that `checkOffer` must be retried after every `unlock`
+ if (!subLock.tryLock()) break
+ val receive: ReceiveOrClosed<E>?
+ val token: Any?
+ try {
+ val result = peekUnderLock()
+ when {
+ result === POLL_FAILED -> continue@loop // must retest `needsToCheckOfferWithoutLock` outside of the lock
+ result is Closed<*> -> {
+ closed = result
+ break@loop // was closed
+ }
+ }
+ // find a receiver for an element
+ receive = takeFirstReceiveOrPeekClosed() ?: break // break when no one's receiving
+ if (receive is Closed<*>) break // noting more to do if this sub already closed
+ token = receive.tryResumeReceive(result as E, idempotent = null)
+ if (token == null) continue // bail out here to next iteration (see for next receiver)
+ val subHead = this.subHead
+ this.subHead = subHead + 1 // retrieved element for this subscriber
+ updated = true
+ } finally {
+ subLock.unlock()
+ }
+ receive!!.completeResumeReceive(token!!)
+ }
+ // do close outside of lock if needed
+ closed?.also { close(cause = it.closeCause) }
+ return updated
+ }
+
+ // result is `E | POLL_FAILED | Closed`
+ override fun pollInternal(): Any? {
+ var updated = false
+ val result = subLock.withLock {
+ val result = peekUnderLock()
+ when {
+ result is Closed<*> -> { /* just bail out of lock */ }
+ result === POLL_FAILED -> { /* just bail out of lock */ }
+ else -> {
+ // update subHead after retrieiving element from buffer
+ val subHead = this.subHead
+ this.subHead = subHead + 1
+ updated = true
+ }
+ }
+ result
+ }
+ // do close outside of lock
+ (result as? Closed<*>)?.also { close(cause = it.closeCause) }
+ // there could have been checkOffer attempt while we were holding lock
+ // now outside the lock recheck if anything else to offer
+ if (checkOffer())
+ updated = true
+ // and finally update broadcast's channel head if needed
+ if (updated)
+ broadcastChannel.updateHead()
+ return result
+ }
+
+ // result is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
+ override fun pollSelectInternal(select: SelectInstance<*>): Any? {
+ var updated = false
+ val result = subLock.withLock {
+ var result = peekUnderLock()
+ when {
+ result is Closed<*> -> { /* just bail out of lock */ }
+ result === POLL_FAILED -> { /* just bail out of lock */ }
+ else -> {
+ // let's try to select receiving this element from buffer
+ if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
+ result = ALREADY_SELECTED
+ } else {
+ // update subHead after retrieiving element from buffer
+ val subHead = this.subHead
+ this.subHead = subHead + 1
+ updated = true
+ }
+ }
+ }
+ result
+ }
+ // do close outside of lock
+ (result as? Closed<*>)?.also { close(cause = it.closeCause) }
+ // there could have been checkOffer attempt while we were holding lock
+ // now outside the lock recheck if anything else to offer
+ if (checkOffer())
+ updated = true
+ // and finally update broadcast's channel head if needed
+ if (updated)
+ broadcastChannel.updateHead()
+ return result
+ }
+
+ // Must invoke this check this after lock, because offer's invocation of `checkOffer` might have failed
+ // to `tryLock` just before the lock was about to unlocked, thus loosing notification to this
+ // subscription about an element that was just offered
+ private fun needsToCheckOfferWithoutLock(): Boolean {
+ if (closedForReceive != null)
+ return false // already closed -> nothing to do
+ if (isBufferEmpty && broadcastChannel.closedForReceive == null)
+ return false // no data for us && broadcast channel was not closed yet -> nothing to do
+ return true // check otherwise
+ }
+
+ // guarded by lock, returns:
+ // E - the element from the buffer at subHead
+ // Closed<*> when closed;
+ // POLL_FAILED when there seems to be no data, but must retest `needsToCheckOfferWithoutLock` outside of lock
+ private fun peekUnderLock(): Any? {
+ val subHead = this.subHead // guarded read (can be non-volatile read)
+ // note: from the broadcastChannel we must read closed token first, then read its tail
+ // because it is Ok if tail moves in between the reads (we make decision based on tail first)
+ val closedBroadcast = broadcastChannel.closedForReceive // unguarded volatile read
+ val tail = broadcastChannel.tail // unguarded volatile read
+ if (subHead >= tail) {
+ // no elements to poll from the queue -- check if closed broads & closed this sub
+ // must retest `needsToCheckOfferWithoutLock` outside of the lock
+ return closedBroadcast ?: this.closedForReceive ?: POLL_FAILED
+ }
+ // Get tentative result. This result may be wrong (completely invalid value, including null),
+ // because this subscription might get closed, moving channel's head past this subscription's head.
+ val result = broadcastChannel.elementAt(subHead)
+ // now check if this subscription was closed
+ val closedSub = this.closedForReceive
+ if (closedSub != null) return closedSub
+ // we know the subscription was not closed, so this tentative result is Ok to return
+ return result
+ }
+ }
+
+ // ------ debug ------
+
+ override val bufferDebugString: String
+ get() = "(buffer:capacity=${buffer.size},size=$size)"
+}
\ No newline at end of file
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
new file mode 100644
index 0000000..b118a89
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
@@ -0,0 +1,252 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.internal.*
+import kotlinx.coroutines.experimental.internalAnnotations.Volatile
+import kotlinx.coroutines.experimental.selects.*
+
+/**
+ * Channel with array buffer of a fixed [capacity].
+ * Sender suspends only when buffer is fully and receiver suspends only when buffer is empty.
+ *
+ * This channel is created by `Channel(capacity)` factory function invocation.
+ *
+ * This implementation uses lock to protect the buffer, which is held only during very short buffer-update operations.
+ * The lists of suspended senders or receivers are lock-free.
+ */
+public open class ArrayChannel<E>(
+ /**
+ * Buffer capacity.
+ */
+ val capacity: Int
+) : AbstractChannel<E>() {
+ init {
+ require(capacity >= 1) { "ArrayChannel capacity must be at least 1, but $capacity was specified" }
+ }
+
+ private val lock = ReentrantLock()
+ private val buffer: Array<Any?> = arrayOfNulls<Any?>(capacity)
+ private var head: Int = 0
+ @Volatile
+ private var size: Int = 0
+
+ protected final override val isBufferAlwaysEmpty: Boolean get() = false
+ protected final override val isBufferEmpty: Boolean get() = size == 0
+ protected final override val isBufferAlwaysFull: Boolean get() = false
+ protected final override val isBufferFull: Boolean get() = size == capacity
+
+ // result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
+ protected override fun offerInternal(element: E): Any {
+ var receive: ReceiveOrClosed<E>? = null
+ var token: Any? = null
+ lock.withLock {
+ val size = this.size
+ closedForSend?.let { return it }
+ if (size < capacity) {
+ // tentatively put element to buffer
+ this.size = size + 1 // update size before checking queue (!!!)
+ // check for receivers that were waiting on empty queue
+ if (size == 0) {
+ loop@ while (true) {
+ receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
+ if (receive is Closed) {
+ this.size = size // restore size
+ return receive!!
+ }
+ token = receive!!.tryResumeReceive(element, idempotent = null)
+ if (token != null) {
+ this.size = size // restore size
+ return@withLock
+ }
+ }
+ }
+ buffer[(head + size) % capacity] = element // actually queue element
+ return OFFER_SUCCESS
+ }
+ // size == capacity: full
+ return OFFER_FAILED
+ }
+ // breaks here if offer meets receiver
+ receive!!.completeResumeReceive(token!!)
+ return receive!!.offerResult
+ }
+
+ // result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`
+ protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
+ var receive: ReceiveOrClosed<E>? = null
+ var token: Any? = null
+ lock.withLock {
+ val size = this.size
+ closedForSend?.let { return it }
+ if (size < capacity) {
+ // tentatively put element to buffer
+ this.size = size + 1 // update size before checking queue (!!!)
+ // check for receivers that were waiting on empty queue
+ if (size == 0) {
+ loop@ while (true) {
+ val offerOp = describeTryOffer(element)
+ val failure = select.performAtomicTrySelect(offerOp)
+ when {
+ failure == null -> { // offered successfully
+ this.size = size // restore size
+ receive = offerOp.result
+ token = offerOp.resumeToken
+ check(token != null)
+ return@withLock
+ }
+ failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
+ failure === ALREADY_SELECTED || failure is Closed<*> -> {
+ this.size = size // restore size
+ return failure
+ }
+ else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
+ }
+ }
+ }
+ // let's try to select sending this element to buffer
+ if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
+ this.size = size // restore size
+ return ALREADY_SELECTED
+ }
+ buffer[(head + size) % capacity] = element // actually queue element
+ return OFFER_SUCCESS
+ }
+ // size == capacity: full
+ return OFFER_FAILED
+ }
+ // breaks here if offer meets receiver
+ receive!!.completeResumeReceive(token!!)
+ return receive!!.offerResult
+ }
+
+ // result is `E | POLL_FAILED | Closed`
+ protected override fun pollInternal(): Any? {
+ var send: Send? = null
+ var token: Any? = null
+ var result: Any? = null
+ lock.withLock {
+ val size = this.size
+ if (size == 0) return closedForSend ?: POLL_FAILED // when nothing can be read from buffer
+ // size > 0: not empty -- retrieve element
+ result = buffer[head]
+ buffer[head] = null
+ this.size = size - 1 // update size before checking queue (!!!)
+ // check for senders that were waiting on full queue
+ var replacement: Any? = POLL_FAILED
+ if (size == capacity) {
+ loop@ while (true) {
+ send = takeFirstSendOrPeekClosed() ?: break
+ token = send!!.tryResumeSend(idempotent = null)
+ if (token != null) {
+ replacement = send!!.pollResult
+ break@loop
+ }
+ }
+ }
+ if (replacement !== POLL_FAILED && replacement !is Closed<*>) {
+ this.size = size // restore size
+ buffer[(head + size) % capacity] = replacement
+ }
+ head = (head + 1) % capacity
+ }
+ // complete send the we're taken replacement from
+ if (token != null)
+ send!!.completeResumeSend(token!!)
+ return result
+ }
+
+ // result is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
+ protected override fun pollSelectInternal(select: SelectInstance<*>): Any? {
+ var send: Send? = null
+ var token: Any? = null
+ var result: Any? = null
+ lock.withLock {
+ val size = this.size
+ if (size == 0) return closedForSend ?: POLL_FAILED
+ // size > 0: not empty -- retrieve element
+ result = buffer[head]
+ buffer[head] = null
+ this.size = size - 1 // update size before checking queue (!!!)
+ // check for senders that were waiting on full queue
+ var replacement: Any? = POLL_FAILED
+ if (size == capacity) {
+ loop@ while (true) {
+ val pollOp = describeTryPoll()
+ val failure = select.performAtomicTrySelect(pollOp)
+ when {
+ failure == null -> { // polled successfully
+ send = pollOp.result
+ token = pollOp.resumeToken
+ check(token != null)
+ replacement = send!!.pollResult
+ break@loop
+ }
+ failure === POLL_FAILED -> break@loop // cannot poll -> Ok to take from buffer
+ failure === ALREADY_SELECTED -> {
+ this.size = size // restore size
+ buffer[head] = result // restore head
+ return failure
+ }
+ failure is Closed<*> -> {
+ send = failure
+ token = failure.tryResumeSend(idempotent = null)
+ replacement = failure
+ break@loop
+ }
+ else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
+ }
+ }
+ }
+ if (replacement !== POLL_FAILED && replacement !is Closed<*>) {
+ this.size = size // restore size
+ buffer[(head + size) % capacity] = replacement
+ } else {
+ // failed to poll or is already closed --> let's try to select receiving this element from buffer
+ if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
+ this.size = size // restore size
+ buffer[head] = result // restore head
+ return ALREADY_SELECTED
+ }
+ }
+ head = (head + 1) % capacity
+ }
+ // complete send the we're taken replacement from
+ if (token != null)
+ send!!.completeResumeSend(token!!)
+ return result
+ }
+
+ // Note: this function is invoked when channel is already closed
+ override fun cleanupSendQueueOnCancel() {
+ // clear buffer first
+ lock.withLock {
+ repeat(size) {
+ buffer[head] = 0
+ head = (head + 1) % capacity
+ }
+ size = 0
+ }
+ // then clean all queued senders
+ super.cleanupSendQueueOnCancel()
+ }
+
+ // ------ debug ------
+
+ override val bufferDebugString: String
+ get() = "(buffer:capacity=${buffer.size},size=$size)"
+}
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt
new file mode 100644
index 0000000..0a9c589
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
+import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
+import kotlinx.coroutines.experimental.internal.Closeable
+
+/**
+ * Broadcast channel is a non-blocking primitive for communication between the sender and multiple receivers
+ * that subscribe for the elements using [openSubscription] function and unsubscribe using [SubscriptionReceiveChannel.close]
+ * function.
+ *
+ * See `BroadcastChannel()` factory function for the description of available
+ * broadcast channel implementations.
+ */
+public interface BroadcastChannel<E> : SendChannel<E> {
+ /**
+ * Factory for broadcast channels.
+ * @suppress **Deprecated**
+ */
+ public companion object Factory {
+ /**
+ * Creates a broadcast channel with the specified buffer capacity.
+ * @suppress **Deprecated**
+ */
+ @Deprecated("Replaced with top-level function", level = DeprecationLevel.HIDDEN)
+ public operator fun <E> invoke(capacity: Int): BroadcastChannel<E> = BroadcastChannel(capacity)
+ }
+
+ /**
+ * Subscribes to this [BroadcastChannel] and returns a channel to receive elements from it.
+ * The resulting channel shall be [closed][SubscriptionReceiveChannel.close] to unsubscribe from this
+ * broadcast channel.
+ */
+ public fun openSubscription(): SubscriptionReceiveChannel<E>
+
+ /**
+ * @suppress **Deprecated**: Renamed to [openSubscription]
+ */
+ @Deprecated(message = "Renamed to `openSubscription`",
+ replaceWith = ReplaceWith("openSubscription()"))
+ public fun open(): SubscriptionReceiveChannel<E> = openSubscription()
+}
+
+/**
+ * Creates a broadcast channel with the specified buffer capacity.
+ *
+ * The resulting channel type depends on the specified [capacity] parameter:
+ * * when `capacity` positive, but less than [UNLIMITED] -- creates [ArrayBroadcastChannel];
+ * * when `capacity` is [CONFLATED] -- creates [ConflatedBroadcastChannel] that conflates back-to-back sends;
+ * * otherwise -- throws [IllegalArgumentException].
+ */
+public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> =
+ when (capacity) {
+ 0 -> throw IllegalArgumentException("Unsupported 0 capacity for BroadcastChannel")
+ UNLIMITED -> throw IllegalArgumentException("Unsupported UNLIMITED capacity for BroadcastChannel")
+ CONFLATED -> ConflatedBroadcastChannel()
+ else -> ArrayBroadcastChannel(capacity)
+ }
+
+/**
+ * Return type for [BroadcastChannel.openSubscription] that can be used to [receive] elements from the
+ * open subscription and to [close] it to unsubscribe.
+ *
+ * Note, that invocation of [cancel] also closes subscription.
+ */
+public interface SubscriptionReceiveChannel<out T> : ReceiveChannel<T>, Closeable {
+ /**
+ * Closes this subscription. This is a synonym for [cancel].
+ */
+ public override fun close() { cancel() }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
new file mode 100644
index 0000000..912f87e
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
@@ -0,0 +1,331 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.CancellationException
+import kotlinx.coroutines.experimental.CoroutineScope
+import kotlinx.coroutines.experimental.Job
+import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
+import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
+import kotlinx.coroutines.experimental.selects.SelectClause1
+import kotlinx.coroutines.experimental.selects.SelectClause2
+import kotlinx.coroutines.experimental.selects.select
+import kotlinx.coroutines.experimental.yield
+
+/**
+ * Sender's interface to [Channel].
+ */
+public interface SendChannel<in E> {
+ /**
+ * Returns `true` if this channel was closed by invocation of [close] and thus
+ * the [send] and [offer] attempts throws exception.
+ */
+ public val isClosedForSend: Boolean
+
+ /**
+ * Returns `true` if the channel is full (out of capacity) and the [send] attempt will suspend.
+ * This function returns `false` for [isClosedForSend] channel.
+ */
+ public val isFull: Boolean
+
+ /**
+ * Adds [element] into to this channel, suspending the caller while this channel [isFull],
+ * or throws exception if the channel [isClosedForSend] (see [close] for details).
+ *
+ * Note, that closing a channel _after_ this function had suspended does not cause this suspended send invocation
+ * to abort, because closing a channel is conceptually like sending a special "close token" over this channel.
+ * All elements that are sent over the channel are delivered in first-in first-out order. The element that
+ * is being sent will get delivered to receivers before a close token.
+ *
+ * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
+ * function is suspended, this function immediately resumes with [CancellationException].
+ *
+ * *Cancellation of suspended send is atomic* -- when this function
+ * throws [CancellationException] it means that the [element] was not sent to this channel.
+ * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
+ * continue to execute even after it was cancelled from the same thread in the case when this send operation
+ * was already resumed and the continuation was posted for execution to the thread's queue.
+ *
+ * Note, that this function does not check for cancellation when it is not suspended.
+ * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+ *
+ * This function can be used in [select] invocation with [onSend] clause.
+ * Use [offer] to try sending to this channel without waiting.
+ */
+ public suspend fun send(element: E)
+
+ /**
+ * Clause for [select] expression of [send] suspending function that selects when the element that is specified
+ * as parameter is sent to the channel. When the clause is selected the reference to this channel
+ * is passed into the corresponding block.
+ *
+ * The [select] invocation fails with exception if the channel [isClosedForSend] (see [close] for details).
+ */
+ public val onSend: SelectClause2<E, SendChannel<E>>
+
+ /**
+ * Adds [element] into this queue if it is possible to do so immediately without violating capacity restrictions
+ * and returns `true`. Otherwise, it returns `false` immediately
+ * or throws exception if the channel [isClosedForSend] (see [close] for details).
+ */
+ public fun offer(element: E): Boolean
+
+ /**
+ * Closes this channel with an optional exceptional [cause].
+ * This is an idempotent operation -- repeated invocations of this function have no effect and return `false`.
+ * Conceptually, its sends a special "close token" over this channel.
+ *
+ * Immediately after invocation of this function
+ * [isClosedForSend] starts returning `true`. However, [isClosedForReceive][ReceiveChannel.isClosedForReceive]
+ * on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements
+ * are received.
+ *
+ * A channel that was closed without a [cause] throws [ClosedSendChannelException] on attempts to send or receive.
+ * A channel that was closed with non-null [cause] is called a _failed_ channel. Attempts to send or
+ * receive on a failed channel throw the specified [cause] exception.
+ */
+ public fun close(cause: Throwable? = null): Boolean
+}
+
+/**
+ * Receiver's interface to [Channel].
+ */
+public interface ReceiveChannel<out E> {
+ /**
+ * Returns `true` if this channel was closed by invocation of [close][SendChannel.close] on the [SendChannel]
+ * side and all previously sent items were already received, so that the [receive] attempt
+ * throws [ClosedReceiveChannelException]. If the channel was closed because of the exception, it
+ * is considered closed, too, but it is called a _failed_ channel. All suspending attempts to receive
+ * an element from a failed channel throw the original [close][SendChannel.close] cause exception.
+ */
+ public val isClosedForReceive: Boolean
+
+ /**
+ * Returns `true` if the channel is empty (contains no elements) and the [receive] attempt will suspend.
+ * This function returns `false` for [isClosedForReceive] channel.
+ */
+ public val isEmpty: Boolean
+
+ /**
+ * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
+ * or throws [ClosedReceiveChannelException] if the channel [isClosedForReceive].
+ * If the channel was closed because of the exception, it is called a _failed_ channel and this function
+ * throws the original [close][SendChannel.close] cause exception.
+ *
+ * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
+ * function is suspended, this function immediately resumes with [CancellationException].
+ *
+ * *Cancellation of suspended receive is atomic* -- when this function
+ * throws [CancellationException] it means that the element was not retrieved from this channel.
+ * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
+ * continue to execute even after it was cancelled from the same thread in the case when this receive operation
+ * was already resumed and the continuation was posted for execution to the thread's queue.
+ *
+ * Note, that this function does not check for cancellation when it is not suspended.
+ * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+ *
+ * This function can be used in [select] invocation with [onReceive] clause.
+ * Use [poll] to try receiving from this channel without waiting.
+ */
+ public suspend fun receive(): E
+
+ /**
+ * Clause for [select] expression of [receive] suspending function that selects with the element that
+ * is received from the channel.
+ * The [select] invocation fails with exception if the channel
+ * [isClosedForReceive] (see [close][SendChannel.close] for details).
+ */
+ public val onReceive: SelectClause1<E>
+
+ /**
+ * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
+ * or returns `null` if the channel is [closed][isClosedForReceive] without cause
+ * or throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
+ *
+ * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
+ * function is suspended, this function immediately resumes with [CancellationException].
+ *
+ * *Cancellation of suspended receive is atomic* -- when this function
+ * throws [CancellationException] it means that the element was not retrieved from this channel.
+ * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
+ * continue to execute even after it was cancelled from the same thread in the case when this receive operation
+ * was already resumed and the continuation was posted for execution to the thread's queue.
+ *
+ * Note, that this function does not check for cancellation when it is not suspended.
+ * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+ *
+ * This function can be used in [select] invocation with [onReceiveOrNull] clause.
+ * Use [poll] to try receiving from this channel without waiting.
+ */
+ public suspend fun receiveOrNull(): E?
+
+ /**
+ * Clause for [select] expression of [receiveOrNull] suspending function that selects with the element that
+ * is received from the channel or selects with `null` if if the channel
+ * [isClosedForReceive] without cause. The [select] invocation fails with
+ * the original [close][SendChannel.close] cause exception if the channel has _failed_.
+ */
+ public val onReceiveOrNull: SelectClause1<E?>
+
+ /**
+ * Retrieves and removes the element from this channel, or returns `null` if this channel [isEmpty]
+ * or is [isClosedForReceive] without cause.
+ * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
+ */
+ public fun poll(): E?
+
+ /**
+ * Returns new iterator to receive elements from this channels using `for` loop.
+ * Iteration completes normally when the channel is [isClosedForReceive] without cause and
+ * throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
+ */
+ public operator fun iterator(): ChannelIterator<E>
+
+ /**
+ * Cancels reception of remaining elements from this channel. This function closes the channel with
+ * the specified cause (unless it was already closed) and removes all buffered sent elements from it.
+ * This function returns `true` if the channel was not closed previously, or `false` otherwise.
+ *
+ * Immediately after invocation of this function [isClosedForReceive] and
+ * [isClosedForSend][SendChannel.isClosedForSend]
+ * on the side of [SendChannel] start returning `true`, so all attempts to send to this channel
+ * afterwards will throw [ClosedSendChannelException], while attempts to receive will throw
+ * [ClosedReceiveChannelException] if it was cancelled without a cause.
+ * A channel that was cancelled with non-null [cause] is called a _failed_ channel. Attempts to send or
+ * receive on a failed channel throw the specified [cause] exception.
+ */
+ public fun cancel(cause: Throwable? = null): Boolean
+}
+
+/**
+ * Iterator for [ReceiveChannel]. Instances of this interface are *not thread-safe* and shall not be used
+ * from concurrent coroutines.
+ */
+public interface ChannelIterator<out E> {
+ /**
+ * Returns `true` if the channel has more elements suspending the caller while this channel
+ * [isEmpty][ReceiveChannel.isEmpty] or returns `false` if the channel
+ * [isClosedForReceive][ReceiveChannel.isClosedForReceive] without cause.
+ * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
+ *
+ * This function retrieves and removes the element from this channel for the subsequent invocation
+ * of [next].
+ *
+ * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
+ * function is suspended, this function immediately resumes with [CancellationException].
+ *
+ * *Cancellation of suspended receive is atomic* -- when this function
+ * throws [CancellationException] it means that the element was not retrieved from this channel.
+ * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
+ * continue to execute even after it was cancelled from the same thread in the case when this receive operation
+ * was already resumed and the continuation was posted for execution to the thread's queue.
+ *
+ * Note, that this function does not check for cancellation when it is not suspended.
+ * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+ */
+ public suspend operator fun hasNext(): Boolean
+
+ /**
+ * Retrieves and removes the element from this channel suspending the caller while this channel
+ * [isEmpty][ReceiveChannel.isEmpty] or throws [ClosedReceiveChannelException] if the channel
+ * [isClosedForReceive][ReceiveChannel.isClosedForReceive] without cause.
+ * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
+ *
+ * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
+ * function is suspended, this function immediately resumes with [CancellationException].
+ *
+ * *Cancellation of suspended receive is atomic* -- when this function
+ * throws [CancellationException] it means that the element was not retrieved from this channel.
+ * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
+ * continue to execute even after it was cancelled from the same thread in the case when this receive operation
+ * was already resumed and the continuation was posted for execution to the thread's queue.
+ *
+ * Note, that this function does not check for cancellation when it is not suspended.
+ * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+ */
+ public suspend operator fun next(): E
+}
+
+/**
+ * Channel is a non-blocking primitive for communication between sender using [SendChannel] and receiver using [ReceiveChannel].
+ * Conceptually, a channel is similar to [BlockingQueue][java.util.concurrent.BlockingQueue],
+ * but it has suspending operations instead of blocking ones and it can be closed.
+ *
+ * See `Channel(capacity)` factory function for the description of available channel implementations.
+ */
+public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
+ /**
+ * Constants for channel factory function `Channel()`.
+ */
+ public companion object Factory {
+ /**
+ * Requests channel with unlimited capacity buffer in `Channel(...)` factory function --
+ * the [LinkedListChannel] gets created.
+ */
+ public const val UNLIMITED = Int.MAX_VALUE
+
+ /**
+ * Requests conflated channel in `Channel(...)` factory function --
+ * the [ConflatedChannel] gets created.
+ */
+ public const val CONFLATED = -1
+
+ /**
+ * Creates a channel with the specified buffer capacity (or without a buffer by default).
+ * @suppress **Deprecated**
+ */
+ @Deprecated("Replaced with top-level function", level = DeprecationLevel.HIDDEN)
+ public operator fun <E> invoke(capacity: Int = 0): Channel<E> = Channel(capacity)
+ }
+}
+
+/**
+ * Creates a channel without a buffer -- [RendezvousChannel].
+ */
+public fun <E> Channel(): Channel<E> = RendezvousChannel<E>()
+
+/**
+ * Creates a channel with the specified buffer capacity (or without a buffer by default).
+ *
+ * The resulting channel type depends on the specified [capacity] parameter:
+ * * when `capacity` is 0 -- creates [RendezvousChannel] without a buffer;
+ * * when `capacity` is [Channel.UNLIMITED] -- creates [LinkedListChannel] with buffer of unlimited size;
+ * * when `capacity` is [Channel.CONFLATED] -- creates [ConflatedChannel] that conflates back-to-back sends;
+ * * when `capacity` is positive, but less than [UNLIMITED] -- creates [ArrayChannel] with a buffer of the specified `capacity`;
+ * * otherwise -- throws [IllegalArgumentException].
+ */
+public fun <E> Channel(capacity: Int): Channel<E> =
+ when (capacity) {
+ 0 -> RendezvousChannel()
+ UNLIMITED -> LinkedListChannel()
+ CONFLATED -> ConflatedChannel()
+ else -> ArrayChannel(capacity)
+ }
+
+/**
+ * Indicates attempt to [send][SendChannel.send] on [isClosedForSend][SendChannel.isClosedForSend] channel
+ * that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause
+ * exception on send attempts.
+ */
+public class ClosedSendChannelException(message: String?) : CancellationException(message)
+
+/**
+ * Indicates attempt to [receive][ReceiveChannel.receive] on [isClosedForReceive][ReceiveChannel.isClosedForReceive]
+ * channel that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause
+ * exception on receive attempts.
+ */
+public class ClosedReceiveChannelException(message: String?) : NoSuchElementException(message)
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt
new file mode 100644
index 0000000..a23c2dc
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+
+internal open class ChannelCoroutine<E>(
+ parentContext: CoroutineContext,
+ protected val _channel: Channel<E>,
+ active: Boolean
+) : AbstractCoroutine<Unit>(parentContext, active), Channel<E> by _channel {
+ val channel: Channel<E>
+ get() = this
+
+ // Workaround for KT-23094
+ override suspend fun receive(): E = _channel.receive()
+
+ override suspend fun send(element: E) = _channel.send(element)
+
+ override suspend fun receiveOrNull(): E? = _channel.receiveOrNull()
+
+ override fun cancel(cause: Throwable?): Boolean = super.cancel(cause)
+}
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt
new file mode 100644
index 0000000..ecb4262
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt
@@ -0,0 +1,1551 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+
+internal const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"
+
+
+// -------- Conversions to ReceiveChannel --------
+
+/**
+ * Returns a channel to read all element of the [Iterable].
+ */
+public fun <E> Iterable<E>.asReceiveChannel(context: CoroutineContext = Unconfined): ReceiveChannel<E> =
+ produce(context) {
+ for (element in this@asReceiveChannel)
+ send(element)
+ }
+
+/**
+ * Returns a channel to read all element of the [Sequence].
+ */
+public fun <E> Sequence<E>.asReceiveChannel(context: CoroutineContext = Unconfined): ReceiveChannel<E> =
+ produce(context) {
+ for (element in this@asReceiveChannel)
+ send(element)
+ }
+
+// -------- Operations on BroadcastChannel --------
+
+/**
+ * Opens subscription to this [BroadcastChannel] and makes sure that the given [block] consumes all elements
+ * from it by always invoking [cancel][SubscriptionReceiveChannel.cancel] after the execution of the block.
+ */
+public inline fun <E, R> BroadcastChannel<E>.consume(block: SubscriptionReceiveChannel<E>.() -> R): R {
+ val channel = openSubscription()
+ try {
+ return channel.block()
+ } finally {
+ channel.cancel()
+ }
+}
+
+/**
+ * Subscribes to this [BroadcastChannel] and performs the specified action for each received element.
+ */
+public suspend inline fun <E> BroadcastChannel<E>.consumeEach(action: (E) -> Unit) =
+ consume {
+ for (element in this) action(element)
+ }
+
+/**
+ * @suppress: **Deprecated**: binary compatibility with old code
+ */
+@Deprecated("binary compatibility with old code", level = DeprecationLevel.HIDDEN)
+public suspend fun <E> BroadcastChannel<E>.consumeEach(action: suspend (E) -> Unit) =
+ consumeEach { action(it) }
+
+// -------- Operations on ReceiveChannel --------
+
+/**
+ * Returns a [CompletionHandler] that invokes [cancel][ReceiveChannel.cancel] on the [ReceiveChannel]
+ * with the corresponding cause. See also [ReceiveChannel.consume].
+ *
+ * **WARNING**: It is planned that in the future a second invocation of this method
+ * on an channel that is already being consumed is going to fail fast, that is
+ * immediately throw an [IllegalStateException].
+ * See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/167)
+ * for details.
+ */
+public fun ReceiveChannel<*>.consumes(): CompletionHandler =
+ { cause: Throwable? -> cancel(cause) }
+
+/**
+ * Returns a [CompletionHandler] that invokes [cancel][ReceiveChannel.cancel] on all the
+ * specified [ReceiveChannel] instances with the corresponding cause.
+ * See also [ReceiveChannel.consumes()] for a version on one channel.
+ */
+public fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler =
+ { cause: Throwable? ->
+ var exception: Throwable? = null
+ for (channel in channels)
+ try {
+ channel.cancel(cause)
+ } catch (e: Throwable) {
+ if (exception == null) {
+ exception = e
+ } else {
+ exception.addSuppressedThrowable(e)
+ }
+ }
+ exception?.let { throw it }
+ }
+
+/**
+ * Makes sure that the given [block] consumes all elements from the given channel
+ * by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
+ *
+ * **WARNING**: It is planned that in the future a second invocation of this method
+ * on an channel that is already being consumed is going to fail fast, that is
+ * immediately throw an [IllegalStateException].
+ * See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/167)
+ * for details.
+ *
+ * The operation is _terminal_.
+ */
+public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
+ var cause: Throwable? = null
+ try {
+ return block()
+ } catch (e: Throwable) {
+ cause = e
+ throw e
+ } finally {
+ cancel(cause)
+ }
+}
+
+/**
+ * Performs the given [action] for each received element.
+ *
+ * **WARNING**: It is planned that in the future a second invocation of this method
+ * on an channel that is already being consumed is going to fail fast, that is
+ * immediately throw an [IllegalStateException].
+ * See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/167)
+ * for details.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit) =
+ consume {
+ for (e in this) action(e)
+ }
+
+/**
+ * @suppress: **Deprecated**: binary compatibility with old code
+ */
+@Deprecated("binary compatibility with old code", level = DeprecationLevel.HIDDEN)
+public suspend fun <E> ReceiveChannel<E>.consumeEach(action: suspend (E) -> Unit) =
+ consumeEach { action(it) }
+
+/**
+ * Performs the given [action] for each received element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.consumeEachIndexed(action: (IndexedValue<E>) -> Unit) {
+ var index = 0
+ consumeEach {
+ action(IndexedValue(index++, it))
+ }
+}
+
+/**
+ * Returns an element at the given [index] or throws an [IndexOutOfBoundsException] if the [index] is out of bounds of this channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.elementAt(index: Int): E =
+ elementAtOrElse(index) { throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.") }
+
+/**
+ * Returns an element at the given [index] or the result of calling the [defaultValue] function if the [index] is out of bounds of this channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.elementAtOrElse(index: Int, defaultValue: (Int) -> E): E =
+ consume {
+ if (index < 0)
+ return defaultValue(index)
+ var count = 0
+ for (element in this) {
+ if (index == count++)
+ return element
+ }
+ return defaultValue(index)
+ }
+
+/**
+ * Returns an element at the given [index] or `null` if the [index] is out of bounds of this channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.elementAtOrNull(index: Int): E? =
+ consume {
+ if (index < 0)
+ return null
+ var count = 0
+ for (element in this) {
+ if (index == count++)
+ return element
+ }
+ return null
+ }
+
+/**
+ * Returns the first element matching the given [predicate], or `null` if no such element was found.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.find(predicate: (E) -> Boolean): E? =
+ firstOrNull(predicate)
+
+/**
+ * Returns the last element matching the given [predicate], or `null` if no such element was found.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.findLast(predicate: (E) -> Boolean): E? =
+ lastOrNull(predicate)
+
+/**
+ * Returns first element.
+ * @throws [NoSuchElementException] if the channel is empty.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.first(): E =
+ consume {
+ val iterator = iterator()
+ if (!iterator.hasNext())
+ throw NoSuchElementException("ReceiveChannel is empty.")
+ return iterator.next()
+ }
+
+/**
+ * Returns the first element matching the given [predicate].
+ * @throws [NoSuchElementException] if no such element is found.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.first(predicate: (E) -> Boolean): E {
+ consumeEach {
+ if (predicate(it)) return it
+ }
+ throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.")
+}
+
+/**
+ * Returns the first element, or `null` if the channel is empty.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.firstOrNull(): E? =
+ consume {
+ val iterator = iterator()
+ if (!iterator.hasNext())
+ return null
+ return iterator.next()
+ }
+
+/**
+ * Returns the first element matching the given [predicate], or `null` if element was not found.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E> ReceiveChannel<E>.firstOrNull(predicate: (E) -> Boolean): E? {
+ consumeEach {
+ if (predicate(it)) return it
+ }
+ return null
+}
+
+/**
+ * Returns first index of [element], or -1 if the channel does not contain element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.indexOf(element: E): Int {
+ var index = 0
+ consumeEach {
+ if (element == it)
+ return index
+ index++
+ }
+ return -1
+}
+
+/**
+ * Returns index of the first element matching the given [predicate], or -1 if the channel does not contain such element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.indexOfFirst(predicate: (E) -> Boolean): Int {
+ var index = 0
+ consumeEach {
+ if (predicate(it))
+ return index
+ index++
+ }
+ return -1
+}
+
+/**
+ * Returns index of the last element matching the given [predicate], or -1 if the channel does not contain such element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E> ReceiveChannel<E>.indexOfLast(predicate: (E) -> Boolean): Int {
+ var lastIndex = -1
+ var index = 0
+ consumeEach {
+ if (predicate(it))
+ lastIndex = index
+ index++
+ }
+ return lastIndex
+}
+
+/**
+ * Returns the last element.
+ * @throws [NoSuchElementException] if the channel is empty.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.last(): E =
+ consume {
+ val iterator = iterator()
+ if (!iterator.hasNext())
+ throw NoSuchElementException("ReceiveChannel is empty.")
+ var last = iterator.next()
+ while (iterator.hasNext())
+ last = iterator.next()
+ return last
+ }
+
+/**
+ * Returns the last element matching the given [predicate].
+ * @throws [NoSuchElementException] if no such element is found.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.last(predicate: (E) -> Boolean): E {
+ var last: E? = null
+ var found = false
+ consumeEach {
+ if (predicate(it)) {
+ last = it
+ found = true
+ }
+ }
+ if (!found) throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.")
+ @Suppress("UNCHECKED_CAST")
+ return last as E
+}
+
+/**
+ * Returns last index of [element], or -1 if the channel does not contain element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.lastIndexOf(element: E): Int {
+ var lastIndex = -1
+ var index = 0
+ consumeEach {
+ if (element == it)
+ lastIndex = index
+ index++
+ }
+ return lastIndex
+}
+
+/**
+ * Returns the last element, or `null` if the channel is empty.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.lastOrNull(): E? =
+ consume {
+ val iterator = iterator()
+ if (!iterator.hasNext())
+ return null
+ var last = iterator.next()
+ while (iterator.hasNext())
+ last = iterator.next()
+ return last
+ }
+
+/**
+ * Returns the last element matching the given [predicate], or `null` if no such element was found.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.lastOrNull(predicate: (E) -> Boolean): E? {
+ var last: E? = null
+ consumeEach {
+ if (predicate(it)) {
+ last = it
+ }
+ }
+ return last
+}
+
+/**
+ * Returns the single element, or throws an exception if the channel is empty or has more than one element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.single(): E =
+ consume {
+ val iterator = iterator()
+ if (!iterator.hasNext())
+ throw NoSuchElementException("ReceiveChannel is empty.")
+ val single = iterator.next()
+ if (iterator.hasNext())
+ throw IllegalArgumentException("ReceiveChannel has more than one element.")
+ return single
+ }
+
+/**
+ * Returns the single element matching the given [predicate], or throws exception if there is no or more than one matching element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.single(predicate: (E) -> Boolean): E {
+ var single: E? = null
+ var found = false
+ consumeEach {
+ if (predicate(it)) {
+ if (found) throw IllegalArgumentException("ReceiveChannel contains more than one matching element.")
+ single = it
+ found = true
+ }
+ }
+ if (!found) throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.")
+ @Suppress("UNCHECKED_CAST")
+ return single as E
+}
+
+/**
+ * Returns single element, or `null` if the channel is empty or has more than one element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.singleOrNull(): E? =
+ consume {
+ val iterator = iterator()
+ if (!iterator.hasNext())
+ return null
+ val single = iterator.next()
+ if (iterator.hasNext())
+ return null
+ return single
+ }
+
+/**
+ * Returns the single element matching the given [predicate], or `null` if element was not found or more than one element was found.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.singleOrNull(predicate: (E) -> Boolean): E? {
+ var single: E? = null
+ var found = false
+ consumeEach {
+ if (predicate(it)) {
+ if (found) return null
+ single = it
+ found = true
+ }
+ }
+ if (!found) return null
+ return single
+}
+
+/**
+ * Returns a channel containing all elements except first [n] elements.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public fun <E> ReceiveChannel<E>.drop(n: Int, context: CoroutineContext = Unconfined): ReceiveChannel<E> =
+ produce(context, onCompletion = consumes()) {
+ require(n >= 0) { "Requested element count $n is less than zero." }
+ var remaining: Int = n
+ if (remaining > 0)
+ for (e in this@drop) {
+ remaining--
+ if (remaining == 0)
+ break
+ }
+ for (e in this@drop) {
+ send(e)
+ }
+ }
+
+/**
+ * Returns a channel containing all elements except first elements that satisfy the given [predicate].
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E> ReceiveChannel<E>.dropWhile(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
+ produce(context, onCompletion = consumes()) {
+ for (e in this@dropWhile) {
+ if (!predicate(e)) {
+ send(e)
+ break
+ }
+ }
+ for (e in this@dropWhile) {
+ send(e)
+ }
+ }
+
+/**
+ * Returns a channel containing only elements matching the given [predicate].
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E> ReceiveChannel<E>.filter(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
+ produce(context, onCompletion = consumes()) {
+ for (e in this@filter) {
+ if (predicate(e)) send(e)
+ }
+ }
+
+/**
+ * Returns a channel containing only elements matching the given [predicate].
+ * @param [predicate] function that takes the index of an element and the element itself
+ * and returns the result of predicate evaluation on the element.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E> ReceiveChannel<E>.filterIndexed(context: CoroutineContext = Unconfined, predicate: suspend (index: Int, E) -> Boolean): ReceiveChannel<E> =
+ produce(context, onCompletion = consumes()) {
+ var index = 0
+ for (e in this@filterIndexed) {
+ if (predicate(index++, e)) send(e)
+ }
+ }
+
+/**
+ * Appends all elements matching the given [predicate] to the given [destination].
+ * @param [predicate] function that takes the index of an element and the element itself
+ * and returns the result of predicate evaluation on the element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterIndexedTo(destination: C, predicate: (index: Int, E) -> Boolean): C {
+ consumeEachIndexed { (index, element) ->
+ if (predicate(index, element)) destination.add(element)
+ }
+ return destination
+}
+
+/**
+ * Appends all elements matching the given [predicate] to the given [destination].
+ * @param [predicate] function that takes the index of an element and the element itself
+ * and returns the result of predicate evaluation on the element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterIndexedTo(destination: C, predicate: (index: Int, E) -> Boolean): C {
+ consumeEachIndexed { (index, element) ->
+ if (predicate(index, element)) destination.send(element)
+ }
+ return destination
+}
+
+/**
+ * Returns a channel containing all elements not matching the given [predicate].
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E> ReceiveChannel<E>.filterNot(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
+ filter(context) { !predicate(it) }
+
+/**
+ * @suppress **Deprecated**: For binary compatibility only
+ */
+@Deprecated("For binary compatibility only", level = DeprecationLevel.HIDDEN)
+public fun <E> ReceiveChannel<E>.filterNot(predicate: suspend (E) -> Boolean): ReceiveChannel<E> = filterNot(predicate = predicate)
+
+/**
+ * Returns a channel containing all elements that are not `null`.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+@Suppress("UNCHECKED_CAST")
+public fun <E : Any> ReceiveChannel<E?>.filterNotNull(): ReceiveChannel<E> =
+ filter { it != null } as ReceiveChannel<E>
+
+/**
+ * Appends all elements that are not `null` to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E : Any, C : MutableCollection<in E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
+ consumeEach {
+ if (it != null) destination.add(it)
+ }
+ return destination
+}
+
+/**
+ * Appends all elements that are not `null` to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E : Any, C : SendChannel<E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
+ consumeEach {
+ if (it != null) destination.send(it)
+ }
+ return destination
+}
+
+/**
+ * Appends all elements not matching the given [predicate] to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterNotTo(destination: C, predicate: (E) -> Boolean): C {
+ consumeEach {
+ if (!predicate(it)) destination.add(it)
+ }
+ return destination
+}
+
+/**
+ * Appends all elements not matching the given [predicate] to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterNotTo(destination: C, predicate: (E) -> Boolean): C {
+ consumeEach {
+ if (!predicate(it)) destination.send(it)
+ }
+ return destination
+}
+
+/**
+ * Appends all elements matching the given [predicate] to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterTo(destination: C, predicate: (E) -> Boolean): C {
+ consumeEach {
+ if (predicate(it)) destination.add(it)
+ }
+ return destination
+}
+
+/**
+ * Appends all elements matching the given [predicate] to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterTo(destination: C, predicate: (E) -> Boolean): C {
+ consumeEach {
+ if (predicate(it)) destination.send(it)
+ }
+ return destination
+}
+
+/**
+ * Returns a channel containing first [n] elements.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public fun <E> ReceiveChannel<E>.take(n: Int, context: CoroutineContext = Unconfined): ReceiveChannel<E> =
+ produce(context, onCompletion = consumes()) {
+ if (n == 0) return@produce
+ require(n >= 0) { "Requested element count $n is less than zero." }
+ var remaining: Int = n
+ for (e in this@take) {
+ send(e)
+ remaining--
+ if (remaining == 0)
+ return@produce
+ }
+ }
+
+/**
+ * Returns a channel containing first elements satisfying the given [predicate].
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E> ReceiveChannel<E>.takeWhile(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
+ produce(context, onCompletion = consumes()) {
+ for (e in this@takeWhile) {
+ if (!predicate(e)) return@produce
+ send(e)
+ }
+ }
+
+/**
+ * Returns a [Map] containing key-value pairs provided by [transform] function
+ * applied to elements of the given channel.
+ *
+ * If any of two pairs would have the same key the last one gets added to the map.
+ *
+ * The returned map preserves the entry iteration order of the original channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, K, V> ReceiveChannel<E>.associate(transform: (E) -> Pair<K, V>): Map<K, V> =
+ associateTo(LinkedHashMap(), transform)
+
+/**
+ * Returns a [Map] containing the elements from the given channel indexed by the key
+ * returned from [keySelector] function applied to each element.
+ *
+ * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
+ *
+ * The returned map preserves the entry iteration order of the original channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, K> ReceiveChannel<E>.associateBy(keySelector: (E) -> K): Map<K, E> =
+ associateByTo(LinkedHashMap(), keySelector)
+
+/**
+ * Returns a [Map] containing the values provided by [valueTransform] and indexed by [keySelector] functions applied to elements of the given channel.
+ *
+ * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
+ *
+ * The returned map preserves the entry iteration order of the original channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, K, V> ReceiveChannel<E>.associateBy(keySelector: (E) -> K, valueTransform: (E) -> V): Map<K, V> =
+ associateByTo(LinkedHashMap(), keySelector, valueTransform)
+
+/**
+ * Populates and returns the [destination] mutable map with key-value pairs,
+ * where key is provided by the [keySelector] function applied to each element of the given channel
+ * and value is the element itself.
+ *
+ * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, K, M : MutableMap<in K, in E>> ReceiveChannel<E>.associateByTo(destination: M, keySelector: (E) -> K): M {
+ consumeEach {
+ destination.put(keySelector(it), it)
+ }
+ return destination
+}
+
+/**
+ * Populates and returns the [destination] mutable map with key-value pairs,
+ * where key is provided by the [keySelector] function and
+ * and value is provided by the [valueTransform] function applied to elements of the given channel.
+ *
+ * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateByTo(destination: M, keySelector: (E) -> K, valueTransform: (E) -> V): M {
+ consumeEach {
+ destination.put(keySelector(it), valueTransform(it))
+ }
+ return destination
+}
+
+/**
+ * Populates and returns the [destination] mutable map with key-value pairs
+ * provided by [transform] function applied to each element of the given channel.
+ *
+ * If any of two pairs would have the same key the last one gets added to the map.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateTo(destination: M, transform: (E) -> Pair<K, V>): M {
+ consumeEach {
+ destination += transform(it)
+ }
+ return destination
+}
+
+/**
+ * Send each element of the original channel
+ * and appends the results to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.toChannel(destination: C): C {
+ consumeEach {
+ destination.send(it)
+ }
+ return destination
+}
+
+/**
+ * Appends all elements to the given [destination] collection.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.toCollection(destination: C): C {
+ consumeEach {
+ destination.add(it)
+ }
+ return destination
+}
+
+/**
+ * Returns a [List] containing all elements.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.toList(): List<E> =
+ this.toMutableList()
+
+/**
+ * Returns a [Map] filled with all elements of this channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <K, V> ReceiveChannel<Pair<K, V>>.toMap(): Map<K, V> =
+ toMap(LinkedHashMap())
+
+/**
+ * Returns a [MutableMap] filled with all elements of this channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <K, V, M : MutableMap<in K, in V>> ReceiveChannel<Pair<K, V>>.toMap(destination: M): M {
+ consumeEach {
+ destination += it
+ }
+ return destination
+}
+
+/**
+ * Returns a [MutableList] filled with all elements of this channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.toMutableList(): MutableList<E> =
+ toCollection(ArrayList())
+
+/**
+ * Returns a [Set] of all elements.
+ *
+ * The returned set preserves the element iteration order of the original channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.toSet(): Set<E> =
+ this.toMutableSet()
+
+/**
+ * Returns a single channel of all elements from results of [transform] function being invoked on each element of original channel.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E, R> ReceiveChannel<E>.flatMap(context: CoroutineContext = Unconfined, transform: suspend (E) -> ReceiveChannel<R>): ReceiveChannel<R> =
+ produce(context, onCompletion = consumes()) {
+ for (e in this@flatMap) {
+ transform(e).toChannel(this)
+ }
+ }
+
+/**
+ * Groups elements of the original channel by the key returned by the given [keySelector] function
+ * applied to each element and returns a map where each group key is associated with a list of corresponding elements.
+ *
+ * The returned map preserves the entry iteration order of the keys produced from the original channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, K> ReceiveChannel<E>.groupBy(keySelector: (E) -> K): Map<K, List<E>> =
+ groupByTo(LinkedHashMap(), keySelector)
+
+/**
+ * Groups values returned by the [valueTransform] function applied to each element of the original channel
+ * by the key returned by the given [keySelector] function applied to the element
+ * and returns a map where each group key is associated with a list of corresponding values.
+ *
+ * The returned map preserves the entry iteration order of the keys produced from the original channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, K, V> ReceiveChannel<E>.groupBy(keySelector: (E) -> K, valueTransform: (E) -> V): Map<K, List<V>> =
+ groupByTo(LinkedHashMap(), keySelector, valueTransform)
+
+/**
+ * Groups elements of the original channel by the key returned by the given [keySelector] function
+ * applied to each element and puts to the [destination] map each group key associated with a list of corresponding elements.
+ *
+ * @return The [destination] map.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, K, M : MutableMap<in K, MutableList<E>>> ReceiveChannel<E>.groupByTo(destination: M, keySelector: (E) -> K): M {
+ consumeEach {
+ val key = keySelector(it)
+ val list = destination.getOrPut(key) { ArrayList() }
+ list.add(it)
+ }
+ return destination
+}
+
+/**
+ * Groups values returned by the [valueTransform] function applied to each element of the original channel
+ * by the key returned by the given [keySelector] function applied to the element
+ * and puts to the [destination] map each group key associated with a list of corresponding values.
+ *
+ * @return The [destination] map.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, K, V, M : MutableMap<in K, MutableList<V>>> ReceiveChannel<E>.groupByTo(destination: M, keySelector: (E) -> K, valueTransform: (E) -> V): M {
+ consumeEach {
+ val key = keySelector(it)
+ val list = destination.getOrPut(key) { ArrayList() }
+ list.add(valueTransform(it))
+ }
+ return destination
+}
+
+/**
+ * Returns a channel containing the results of applying the given [transform] function
+ * to each element in the original channel.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark transform with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, transform: suspend (E) -> R): ReceiveChannel<R> =
+ produce(context, onCompletion = consumes()) {
+ consumeEach {
+ send(transform(it))
+ }
+ }
+
+/**
+ * Returns a channel containing the results of applying the given [transform] function
+ * to each element and its index in the original channel.
+ * @param [transform] function that takes the index of an element and the element itself
+ * and returns the result of the transform applied to the element.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E, R> ReceiveChannel<E>.mapIndexed(context: CoroutineContext = Unconfined, transform: suspend (index: Int, E) -> R): ReceiveChannel<R> =
+ produce(context, onCompletion = consumes()) {
+ var index = 0
+ for (e in this@mapIndexed) {
+ send(transform(index++, e))
+ }
+ }
+
+/**
+ * Returns a channel containing only the non-null results of applying the given [transform] function
+ * to each element and its index in the original channel.
+ * @param [transform] function that takes the index of an element and the element itself
+ * and returns the result of the transform applied to the element.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E, R : Any> ReceiveChannel<E>.mapIndexedNotNull(context: CoroutineContext = Unconfined, transform: suspend (index: Int, E) -> R?): ReceiveChannel<R> =
+ mapIndexed(context, transform).filterNotNull()
+
+/**
+ * Applies the given [transform] function to each element and its index in the original channel
+ * and appends only the non-null results to the given [destination].
+ * @param [transform] function that takes the index of an element and the element itself
+ * and returns the result of the transform applied to the element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedNotNullTo(destination: C, transform: (index: Int, E) -> R?): C {
+ consumeEachIndexed { (index, element) ->
+ transform(index, element)?.let { destination.add(it) }
+ }
+ return destination
+}
+
+/**
+ * Applies the given [transform] function to each element and its index in the original channel
+ * and appends only the non-null results to the given [destination].
+ * @param [transform] function that takes the index of an element and the element itself
+ * and returns the result of the transform applied to the element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, R : Any, C : SendChannel<R>> ReceiveChannel<E>.mapIndexedNotNullTo(destination: C, transform: (index: Int, E) -> R?): C {
+ consumeEachIndexed { (index, element) ->
+ transform(index, element)?.let { destination.send(it) }
+ }
+ return destination
+}
+
+/**
+ * Applies the given [transform] function to each element and its index in the original channel
+ * and appends the results to the given [destination].
+ * @param [transform] function that takes the index of an element and the element itself
+ * and returns the result of the transform applied to the element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedTo(destination: C, transform: (index: Int, E) -> R): C {
+ var index = 0
+ consumeEach {
+ destination.add(transform(index++, it))
+ }
+ return destination
+}
+
+/**
+ * Applies the given [transform] function to each element and its index in the original channel
+ * and appends the results to the given [destination].
+ * @param [transform] function that takes the index of an element and the element itself
+ * and returns the result of the transform applied to the element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, R, C : SendChannel<R>> ReceiveChannel<E>.mapIndexedTo(destination: C, transform: (index: Int, E) -> R): C {
+ var index = 0
+ consumeEach {
+ destination.send(transform(index++, it))
+ }
+ return destination
+}
+
+/**
+ * Returns a channel containing only the non-null results of applying the given [transform] function
+ * to each element in the original channel.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E, R : Any> ReceiveChannel<E>.mapNotNull(context: CoroutineContext = Unconfined, transform: suspend (E) -> R?): ReceiveChannel<R> =
+ map(context, transform).filterNotNull()
+
+/**
+ * Applies the given [transform] function to each element in the original channel
+ * and appends only the non-null results to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapNotNullTo(destination: C, transform: (E) -> R?): C {
+ consumeEach {
+ transform(it)?.let { destination.add(it) }
+ }
+ return destination
+}
+
+/**
+ * Applies the given [transform] function to each element in the original channel
+ * and appends only the non-null results to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, R : Any, C : SendChannel<R>> ReceiveChannel<E>.mapNotNullTo(destination: C, transform: (E) -> R?): C {
+ consumeEach {
+ transform(it)?.let { destination.send(it) }
+ }
+ return destination
+}
+
+/**
+ * Applies the given [transform] function to each element of the original channel
+ * and appends the results to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapTo(destination: C, transform: (E) -> R): C {
+ consumeEach {
+ destination.add(transform(it))
+ }
+ return destination
+}
+
+/**
+ * Applies the given [transform] function to each element of the original channel
+ * and appends the results to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, R, C : SendChannel<R>> ReceiveChannel<E>.mapTo(destination: C, transform: (E) -> R): C {
+ consumeEach {
+ destination.send(transform(it))
+ }
+ return destination
+}
+
+/**
+ * Returns a channel of [IndexedValue] for each element of the original channel.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public fun <E> ReceiveChannel<E>.withIndex(context: CoroutineContext = Unconfined): ReceiveChannel<IndexedValue<E>> =
+ produce(context, onCompletion = consumes()) {
+ var index = 0
+ for (e in this@withIndex) {
+ send(IndexedValue(index++, e))
+ }
+ }
+
+/**
+ * Returns a channel containing only distinct elements from the given channel.
+ *
+ * The elements in the resulting channel are in the same order as they were in the source channel.
+ *
+ * The operation is _intermediate_ and _stateful_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public fun <E> ReceiveChannel<E>.distinct(): ReceiveChannel<E> =
+ this.distinctBy { it }
+
+/**
+ * Returns a channel containing only elements from the given channel
+ * having distinct keys returned by the given [selector] function.
+ *
+ * The elements in the resulting channel are in the same order as they were in the source channel.
+ *
+ * The operation is _intermediate_ and _stateful_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E, K> ReceiveChannel<E>.distinctBy(context: CoroutineContext = Unconfined, selector: suspend (E) -> K): ReceiveChannel<E> =
+ produce(context, onCompletion = consumes()) {
+ val keys = HashSet<K>()
+ for (e in this@distinctBy) {
+ val k = selector(e)
+ if (k !in keys) {
+ send(e)
+ keys += k
+ }
+ }
+ }
+
+/**
+ * Returns a mutable set containing all distinct elements from the given channel.
+ *
+ * The returned set preserves the element iteration order of the original channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.toMutableSet(): MutableSet<E> =
+ toCollection(LinkedHashSet())
+
+/**
+ * Returns `true` if all elements match the given [predicate].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.all(predicate: (E) -> Boolean): Boolean {
+ consumeEach {
+ if (!predicate(it)) return false
+ }
+ return true
+}
+
+/**
+ * Returns `true` if channel has at least one element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.any(): Boolean =
+ consume {
+ return iterator().hasNext()
+ }
+
+/**
+ * Returns `true` if at least one element matches the given [predicate].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.any(predicate: (E) -> Boolean): Boolean {
+ consumeEach {
+ if (predicate(it)) return true
+ }
+ return false
+}
+
+/**
+ * Returns the number of elements in this channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.count(): Int {
+ var count = 0
+ consumeEach { count++ }
+ return count
+}
+
+/**
+ * Returns the number of elements matching the given [predicate].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.count(predicate: (E) -> Boolean): Int {
+ var count = 0
+ consumeEach {
+ if (predicate(it)) count++
+ }
+ return count
+}
+
+/**
+ * Accumulates value starting with [initial] value and applying [operation] from left to right to current accumulator value and each element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, R> ReceiveChannel<E>.fold(initial: R, operation: (acc: R, E) -> R): R {
+ var accumulator = initial
+ consumeEach {
+ accumulator = operation(accumulator, it)
+ }
+ return accumulator
+}
+
+/**
+ * Accumulates value starting with [initial] value and applying [operation] from left to right
+ * to current accumulator value and each element with its index in the original channel.
+ * @param [operation] function that takes the index of an element, current accumulator value
+ * and the element itself, and calculates the next accumulator value.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, R> ReceiveChannel<E>.foldIndexed(initial: R, operation: (index: Int, acc: R, E) -> R): R {
+ var index = 0
+ var accumulator = initial
+ consumeEach {
+ accumulator = operation(index++, accumulator, it)
+ }
+ return accumulator
+}
+
+/**
+ * Returns the first element yielding the largest value of the given function or `null` if there are no elements.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, R : Comparable<R>> ReceiveChannel<E>.maxBy(selector: (E) -> R): E? =
+ consume {
+ val iterator = iterator()
+ if (!iterator.hasNext()) return null
+ var maxElem = iterator.next()
+ var maxValue = selector(maxElem)
+ while (iterator.hasNext()) {
+ val e = iterator.next()
+ val v = selector(e)
+ if (maxValue < v) {
+ maxElem = e
+ maxValue = v
+ }
+ }
+ return maxElem
+ }
+
+/**
+ * Returns the first element having the largest value according to the provided [comparator] or `null` if there are no elements.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.maxWith(comparator: Comparator<in E>): E? =
+ consume {
+ val iterator = iterator()
+ if (!iterator.hasNext()) return null
+ var max = iterator.next()
+ while (iterator.hasNext()) {
+ val e = iterator.next()
+ if (comparator.compare(max, e) < 0) max = e
+ }
+ return max
+ }
+
+/**
+ * Returns the first element yielding the smallest value of the given function or `null` if there are no elements.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E, R : Comparable<R>> ReceiveChannel<E>.minBy(selector: (E) -> R): E? =
+ consume {
+ val iterator = iterator()
+ if (!iterator.hasNext()) return null
+ var minElem = iterator.next()
+ var minValue = selector(minElem)
+ while (iterator.hasNext()) {
+ val e = iterator.next()
+ val v = selector(e)
+ if (minValue > v) {
+ minElem = e
+ minValue = v
+ }
+ }
+ return minElem
+ }
+
+/**
+ * Returns the first element having the smallest value according to the provided [comparator] or `null` if there are no elements.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.minWith(comparator: Comparator<in E>): E? =
+ consume {
+ val iterator = iterator()
+ if (!iterator.hasNext()) return null
+ var min = iterator.next()
+ while (iterator.hasNext()) {
+ val e = iterator.next()
+ if (comparator.compare(min, e) > 0) min = e
+ }
+ return min
+ }
+
+/**
+ * Returns `true` if the channel has no elements.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.none(): Boolean =
+ consume {
+ return !iterator().hasNext()
+ }
+
+/**
+ * Returns `true` if no elements match the given [predicate].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.none(predicate: (E) -> Boolean): Boolean {
+ consumeEach {
+ if (predicate(it)) return false
+ }
+ return true
+}
+
+/**
+ * Accumulates value starting with the first element and applying [operation] from left to right to current accumulator value and each element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <S, E : S> ReceiveChannel<E>.reduce(operation: (acc: S, E) -> S): S =
+ consume {
+ val iterator = this.iterator()
+ if (!iterator.hasNext()) throw UnsupportedOperationException("Empty channel can't be reduced.")
+ var accumulator: S = iterator.next()
+ while (iterator.hasNext()) {
+ accumulator = operation(accumulator, iterator.next())
+ }
+ return accumulator
+ }
+
+/**
+ * Accumulates value starting with the first element and applying [operation] from left to right
+ * to current accumulator value and each element with its index in the original channel.
+ * @param [operation] function that takes the index of an element, current accumulator value
+ * and the element itself and calculates the next accumulator value.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark operation with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public suspend inline fun <S, E : S> ReceiveChannel<E>.reduceIndexed(operation: (index: Int, acc: S, E) -> S): S =
+ consume {
+ val iterator = this.iterator()
+ if (!iterator.hasNext()) throw UnsupportedOperationException("Empty channel can't be reduced.")
+ var index = 1
+ var accumulator: S = iterator.next()
+ while (iterator.hasNext()) {
+ accumulator = operation(index++, accumulator, iterator.next())
+ }
+ return accumulator
+ }
+
+/**
+ * Returns the sum of all values produced by [selector] function applied to each element in the channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.sumBy(selector: (E) -> Int): Int {
+ var sum = 0
+ consumeEach {
+ sum += selector(it)
+ }
+ return sum
+}
+
+/**
+ * Returns the sum of all values produced by [selector] function applied to each element in the channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.sumByDouble(selector: (E) -> Double): Double {
+ var sum = 0.0
+ consumeEach {
+ sum += selector(it)
+ }
+ return sum
+}
+
+/**
+ * Returns an original collection containing all the non-`null` elements, throwing an [IllegalArgumentException] if there are any `null` elements.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public fun <E : Any> ReceiveChannel<E?>.requireNoNulls(): ReceiveChannel<E> =
+ map { it ?: throw IllegalArgumentException("null element found in $this.") }
+
+/**
+ * Splits the original channel into pair of lists,
+ * where *first* list contains elements for which [predicate] yielded `true`,
+ * while *second* list contains elements for which [predicate] yielded `false`.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend inline fun <E> ReceiveChannel<E>.partition(predicate: (E) -> Boolean): Pair<List<E>, List<E>> {
+ val first = ArrayList<E>()
+ val second = ArrayList<E>()
+ consumeEach {
+ if (predicate(it)) {
+ first.add(it)
+ } else {
+ second.add(it)
+ }
+ }
+ return Pair(first, second)
+}
+
+/**
+ * Returns a channel of pairs built from elements of both channels with same indexes.
+ * Resulting channel has length of shortest input channel.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of both the original [ReceiveChannel] and the `other` one.
+ */
+public infix fun <E, R> ReceiveChannel<E>.zip(other: ReceiveChannel<R>): ReceiveChannel<Pair<E, R>> =
+ zip(other) { t1, t2 -> t1 to t2 }
+
+/**
+ * Returns a channel of values built from elements of both collections with same indexes using provided [transform]. Resulting channel has length of shortest input channels.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][consume] all elements of both the original [ReceiveChannel] and the `other` one.
+ */
+// todo: mark transform with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E, R, V> ReceiveChannel<E>.zip(other: ReceiveChannel<R>, context: CoroutineContext = Unconfined, transform: (a: E, b: R) -> V): ReceiveChannel<V> =
+ produce(context, onCompletion = consumesAll(this, other)) {
+ val otherIterator = other.iterator()
+ this@zip.consumeEach { element1 ->
+ if (!otherIterator.hasNext()) return@consumeEach
+ val element2 = otherIterator.next()
+ send(transform(element1, element2))
+ }
+ }
+
+
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt
new file mode 100644
index 0000000..a8ec1ff
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt
@@ -0,0 +1,258 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.atomicfu.*
+import kotlinx.coroutines.experimental.internal.*
+import kotlinx.coroutines.experimental.internalAnnotations.*
+import kotlinx.coroutines.experimental.intrinsics.*
+import kotlinx.coroutines.experimental.selects.*
+
+/**
+ * Broadcasts the most recently sent element (aka [value]) to all [openSubscription] subscribers.
+ *
+ * Back-to-send sent elements are _conflated_ -- only the the most recently sent value is received,
+ * while previously sent elements **are lost**.
+ * Every subscriber immediately receives the most recently sent element.
+ * Sender to this broadcast channel never suspends and [offer] always returns `true`.
+ *
+ * A secondary constructor can be used to create an instance of this class that already holds a value.
+ * This channel is also created by `BroadcastChannel(Channel.CONFLATED)` factory function invocation.
+ *
+ * This implementation is fully lock-free. In this implementation
+ * [opening][openSubscription] and [closing][SubscriptionReceiveChannel.close] subscription takes O(N) time, where N is the
+ * number of subscribers.
+ */
+public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
+ /**
+ * Creates an instance of this class that already holds a value.
+ *
+ * It is as a shortcut to creating an instance with a default constructor and
+ * immediately sending an element: `ConflatedBroadcastChannel().apply { offer(value) }`.
+ */
+ constructor(value: E) : this() {
+ _state.lazySet(State<E>(value, null))
+ }
+
+ private val _state = atomic<Any>(INITIAL_STATE) // State | Closed
+ private val _updating = atomic(0)
+
+ private companion object {
+ @JvmField
+ val CLOSED = Closed(null)
+
+ @JvmField
+ val UNDEFINED = Symbol("UNDEFINED")
+
+ @JvmField
+ val INITIAL_STATE = State<Any?>(UNDEFINED, null)
+ }
+
+ private class State<E>(
+ @JvmField val value: Any?, // UNDEFINED | E
+ @JvmField val subscribers: Array<Subscriber<E>>?
+ )
+
+ private class Closed(@JvmField val closeCause: Throwable?) {
+ val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
+ val valueException: Throwable get() = closeCause ?: IllegalStateException(DEFAULT_CLOSE_MESSAGE)
+ }
+
+ /**
+ * The most recently sent element to this channel.
+ *
+ * Access to this property throws [IllegalStateException] when this class is constructed without
+ * initial value and no value was sent yet or if it was [closed][close] without a cause.
+ * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
+ */
+ @Suppress("UNCHECKED_CAST")
+ public val value: E get() {
+ _state.loop { state ->
+ when (state) {
+ is Closed -> throw state.valueException
+ is State<*> -> {
+ if (state.value === UNDEFINED) throw IllegalStateException("No value")
+ return state.value as E
+ }
+ else -> error("Invalid state $state")
+ }
+ }
+ }
+
+ /**
+ * The most recently sent element to this channel or `null` when this class is constructed without
+ * initial value and no value was sent yet or if it was [closed][close].
+ */
+ @Suppress("UNCHECKED_CAST")
+ public val valueOrNull: E? get() {
+ val state = _state.value
+ when (state) {
+ is Closed -> return null
+ is State<*> -> {
+ if (state.value === UNDEFINED) return null
+ return state.value as E
+ }
+ else -> error("Invalid state $state")
+ }
+ }
+
+ override val isClosedForSend: Boolean get() = _state.value is Closed
+ override val isFull: Boolean get() = false
+
+ @Suppress("UNCHECKED_CAST")
+ override fun openSubscription(): SubscriptionReceiveChannel<E> {
+ val subscriber = Subscriber<E>(this)
+ _state.loop { state ->
+ when (state) {
+ is Closed -> {
+ subscriber.close(state.closeCause)
+ return subscriber
+ }
+ is State<*> -> {
+ if (state.value !== UNDEFINED)
+ subscriber.offerInternal(state.value as E)
+ val update = State(state.value, addSubscriber((state as State<E>).subscribers, subscriber))
+ if (_state.compareAndSet(state, update))
+ return subscriber
+ }
+ else -> error("Invalid state $state")
+ }
+ }
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ private fun closeSubscriber(subscriber: Subscriber<E>) {
+ _state.loop { state ->
+ when (state) {
+ is Closed -> return
+ is State<*> -> {
+ val update = State(state.value, removeSubscriber((state as State<E>).subscribers!!, subscriber))
+ if (_state.compareAndSet(state, update))
+ return
+ }
+ else -> error("Invalid state $state")
+ }
+ }
+ }
+
+ private fun addSubscriber(list: Array<Subscriber<E>>?, subscriber: Subscriber<E>): Array<Subscriber<E>> {
+ if (list == null) return Array<Subscriber<E>>(1) { subscriber }
+ return list + subscriber
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ private fun removeSubscriber(list: Array<Subscriber<E>>, subscriber: Subscriber<E>): Array<Subscriber<E>>? {
+ val n = list.size
+ val i = list.indexOf(subscriber)
+ check(i >= 0)
+ if (n == 1) return null
+ val update = arrayOfNulls<Subscriber<E>>(n - 1)
+ arraycopy(list, 0, update, 0, i)
+ arraycopy(list, i + 1, update, i, n - i - 1)
+ return update as Array<Subscriber<E>>
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ override fun close(cause: Throwable?): Boolean {
+ _state.loop { state ->
+ when (state) {
+ is Closed -> return false
+ is State<*> -> {
+ val update = if (cause == null) CLOSED else Closed(cause)
+ if (_state.compareAndSet(state, update)) {
+ (state as State<E>).subscribers?.forEach { it.close(cause) }
+ return true
+ }
+ }
+ else -> error("Invalid state $state")
+ }
+ }
+ }
+
+ /**
+ * Sends the value to all subscribed receives and stores this value as the most recent state for
+ * future subscribers. This implementation never suspends.
+ * It throws exception if the channel [isClosedForSend] (see [close] for details).
+ */
+ suspend override fun send(element: E) {
+ offerInternal(element)?.let { throw it.sendException }
+ }
+
+ /**
+ * Sends the value to all subscribed receives and stores this value as the most recent state for
+ * future subscribers. This implementation always returns `true`.
+ * It throws exception if the channel [isClosedForSend] (see [close] for details).
+ */
+ override fun offer(element: E): Boolean {
+ offerInternal(element)?.let { throw it.sendException }
+ return true
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ private fun offerInternal(element: E): Closed? {
+ // If some other thread is updating the state in its offer operation we assume that our offer had linearized
+ // before that offer (we lost) and that offer overwrote us and conflated our offer.
+ if (!_updating.compareAndSet(0, 1)) return null
+ try {
+ _state.loop { state ->
+ when (state) {
+ is Closed -> return state
+ is State<*> -> {
+ val update = State(element, (state as State<E>).subscribers)
+ if (_state.compareAndSet(state, update)) {
+ // Note: Using offerInternal here to ignore the case when this subscriber was
+ // already concurrently closed (assume the close had conflated our offer for this
+ // particular subscriber).
+ state.subscribers?.forEach { it.offerInternal(element) }
+ return null
+ }
+ }
+ else -> error("Invalid state $state")
+ }
+ }
+ } finally {
+ _updating.value = 0 // reset the updating flag to zero even when something goes wrong
+ }
+ }
+
+ override val onSend: SelectClause2<E, SendChannel<E>>
+ get() = object : SelectClause2<E, SendChannel<E>> {
+ override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
+ registerSelectSend(select, param, block)
+ }
+ }
+
+ private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
+ if (!select.trySelect(null)) return
+ offerInternal(element)?.let {
+ select.resumeSelectCancellableWithException(it.sendException)
+ return
+ }
+ block.startCoroutineUndispatched(receiver = this, completion = select.completion)
+ }
+
+ private class Subscriber<E>(
+ private val broadcastChannel: ConflatedBroadcastChannel<E>
+ ) : ConflatedChannel<E>(), SubscriptionReceiveChannel<E> {
+ override fun cancel(cause: Throwable?): Boolean =
+ close(cause).also { closed ->
+ if (closed) broadcastChannel.closeSubscriber(this)
+ }
+
+ public override fun offerInternal(element: E): Any = super.offerInternal(element)
+ }
+}
\ No newline at end of file
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannel.kt
new file mode 100644
index 0000000..ae98756
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannel.kt
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.selects.ALREADY_SELECTED
+import kotlinx.coroutines.experimental.selects.SelectInstance
+
+/**
+ * Channel that buffers at most one element and conflates all subsequent `send` and `offer` invocations,
+ * so that the receiver always gets the most recently sent element.
+ * Back-to-send sent elements are _conflated_ -- only the the most recently sent element is received,
+ * while previously sent elements **are lost**.
+ * Sender to this channel never suspends and [offer] always returns `true`.
+ *
+ * This channel is created by `Channel(Channel.CONFLATED)` factory function invocation.
+ *
+ * This implementation is fully lock-free.
+ */
+public open class ConflatedChannel<E> : AbstractChannel<E>() {
+ protected final override val isBufferAlwaysEmpty: Boolean get() = true
+ protected final override val isBufferEmpty: Boolean get() = true
+ protected final override val isBufferAlwaysFull: Boolean get() = false
+ protected final override val isBufferFull: Boolean get() = false
+
+ /**
+ * This implementation conflates last sent item when channel is closed.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ override fun onClosed(closed: Closed<E>) {
+ conflatePreviousSendBuffered(closed)
+ }
+
+ // result is always `OFFER_SUCCESS | Closed`
+ protected override fun offerInternal(element: E): Any {
+ while (true) {
+ val result = super.offerInternal(element)
+ when {
+ result === OFFER_SUCCESS -> return OFFER_SUCCESS
+ result === OFFER_FAILED -> { // try to buffer
+ val sendResult = sendConflated(element)
+ when (sendResult) {
+ null -> return OFFER_SUCCESS
+ is Closed<*> -> return sendResult
+ }
+ // otherwise there was receiver in queue, retry super.offerInternal
+ }
+ result is Closed<*> -> return result
+ else -> error("Invalid offerInternal result $result")
+ }
+ }
+ }
+
+ // result is always `ALREADY_SELECTED | OFFER_SUCCESS | Closed`.
+ protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
+ while (true) {
+ val result = if (hasReceiveOrClosed)
+ super.offerSelectInternal(element, select) else
+ (select.performAtomicTrySelect(describeSendConflated(element)) ?: OFFER_SUCCESS)
+ when {
+ result === ALREADY_SELECTED -> return ALREADY_SELECTED
+ result === OFFER_SUCCESS -> return OFFER_SUCCESS
+ result === OFFER_FAILED -> {} // retry
+ result is Closed<*> -> return result
+ else -> error("Invalid result $result")
+ }
+ }
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannel.kt
new file mode 100644
index 0000000..f2962ab
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannel.kt
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.selects.ALREADY_SELECTED
+import kotlinx.coroutines.experimental.selects.SelectInstance
+
+/**
+ * Channel with linked-list buffer of a unlimited capacity (limited only by available memory).
+ * Sender to this channel never suspends and [offer] always returns `true`.
+ *
+ * This channel is created by `Channel(Channel.UNLIMITED)` factory function invocation.
+ *
+ * This implementation is fully lock-free.
+ */
+public open class LinkedListChannel<E> : AbstractChannel<E>() {
+ protected final override val isBufferAlwaysEmpty: Boolean get() = true
+ protected final override val isBufferEmpty: Boolean get() = true
+ protected final override val isBufferAlwaysFull: Boolean get() = false
+ protected final override val isBufferFull: Boolean get() = false
+
+ // result is always `OFFER_SUCCESS | Closed`
+ protected override fun offerInternal(element: E): Any {
+ while (true) {
+ val result = super.offerInternal(element)
+ when {
+ result === OFFER_SUCCESS -> return OFFER_SUCCESS
+ result === OFFER_FAILED -> { // try to buffer
+ val sendResult = sendBuffered(element)
+ when (sendResult) {
+ null -> return OFFER_SUCCESS
+ is Closed<*> -> return sendResult
+ }
+ // otherwise there was receiver in queue, retry super.offerInternal
+ }
+ result is Closed<*> -> return result
+ else -> error("Invalid offerInternal result $result")
+ }
+ }
+ }
+
+ // result is always `ALREADY_SELECTED | OFFER_SUCCESS | Closed`.
+ protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
+ while (true) {
+ val result = if (hasReceiveOrClosed)
+ super.offerSelectInternal(element, select) else
+ (select.performAtomicTrySelect(describeSendBuffered(element)) ?: OFFER_SUCCESS)
+ when {
+ result === ALREADY_SELECTED -> return ALREADY_SELECTED
+ result === OFFER_SUCCESS -> return OFFER_SUCCESS
+ result === OFFER_FAILED -> {} // retry
+ result is Closed<*> -> return result
+ else -> error("Invalid result $result")
+ }
+ }
+ }
+}
+
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt
new file mode 100644
index 0000000..fc86b0a
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+
+/**
+ * Scope for [produce] coroutine builder.
+ */
+public interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
+ /**
+ * A reference to the channel that this coroutine [sends][send] elements to.
+ * It is provided for convenience, so that the code in the coroutine can refer
+ * to the channel as `channel` as apposed to `this`.
+ * All the [SendChannel] functions on this interface delegate to
+ * the channel instance returned by this function.
+ */
+ val channel: SendChannel<E>
+}
+
+/**
+ * @suppress **Deprecated**: Use `ReceiveChannel`.
+ */
+@Deprecated(message = "Use `ReceiveChannel`", replaceWith = ReplaceWith("ReceiveChannel"))
+@Suppress("MULTIPLE_DEFAULTS_INHERITED_FROM_SUPERTYPES_WHEN_NO_EXPLICIT_OVERRIDE")
+interface ProducerJob<out E> : ReceiveChannel<E>, Job {
+ @Deprecated(message = "Use ReceiveChannel itself")
+ val channel: ReceiveChannel<E>
+}
+
+/**
+ * Launches new coroutine to produce a stream of values by sending them to a channel
+ * and returns a reference to the coroutine as a [ReceiveChannel]. This resulting
+ * object can be used to [receive][ReceiveChannel.receive] elements produced by this coroutine.
+ *
+ * The scope of the coroutine contains [ProducerScope] interface, which implements
+ * both [CoroutineScope] and [SendChannel], so that coroutine can invoke
+ * [send][SendChannel.send] directly. The channel is [closed][SendChannel.close]
+ * when the coroutine completes.
+ * The running coroutine is cancelled when its receive channel is [cancelled][ReceiveChannel.cancel].
+ *
+ * The [context] for the new coroutine can be explicitly specified.
+ * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
+ * The [coroutineContext] of the parent coroutine may be used,
+ * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
+ * The parent job may be also explicitly specified using [parent] parameter.
+ *
+ * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
+ *
+ * Uncaught exceptions in this coroutine close the channel with this exception as a cause and
+ * the resulting channel becomes _failed_, so that any attempt to receive from such a channel throws exception.
+ *
+ * See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
+ *
+ * @param context context of the coroutine. The default value is [DefaultDispatcher].
+ * @param capacity capacity of the channel's buffer (no buffer by default).
+ * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).*
+ * @param onCompletion optional completion handler for the producer coroutine (see [Job.invokeOnCompletion]).
+ * @param block the coroutine code.
+ */
+public fun <E> produce(
+ context: CoroutineContext = DefaultDispatcher,
+ capacity: Int = 0,
+ parent: Job? = null,
+ onCompletion: CompletionHandler? = null,
+ block: suspend ProducerScope<E>.() -> Unit
+): ReceiveChannel<E> {
+ val channel = Channel<E>(capacity)
+ val newContext = newCoroutineContext(context, parent)
+ val coroutine = ProducerCoroutine(newContext, channel)
+ if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
+ coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
+ return coroutine
+}
+
+/** @suppress **Deprecated**: Binary compatibility */
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public fun <E> produce(
+ context: CoroutineContext = DefaultDispatcher,
+ capacity: Int = 0,
+ parent: Job? = null,
+ block: suspend ProducerScope<E>.() -> Unit
+): ReceiveChannel<E> = produce(context, capacity, parent, block = block)
+
+/** @suppress **Deprecated**: Binary compatibility */
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public fun <E> produce(
+ context: CoroutineContext = DefaultDispatcher,
+ capacity: Int = 0,
+ block: suspend ProducerScope<E>.() -> Unit
+): ProducerJob<E> =
+ produce(context, capacity, block = block) as ProducerJob<E>
+
+/**
+ * @suppress **Deprecated**: Renamed to `produce`.
+ */
+@Deprecated(message = "Renamed to `produce`", replaceWith = ReplaceWith("produce(context, capacity, block)"))
+public fun <E> buildChannel(
+ context: CoroutineContext,
+ capacity: Int = 0,
+ block: suspend ProducerScope<E>.() -> Unit
+): ProducerJob<E> =
+ produce(context, capacity, block = block) as ProducerJob<E>
+
+private class ProducerCoroutine<E>(
+ parentContext: CoroutineContext, channel: Channel<E>
+) : ChannelCoroutine<E>(parentContext, channel, active = true), ProducerScope<E>, ProducerJob<E> {
+ override fun onCancellationInternal(exceptionally: CompletedExceptionally?) {
+ val cause = exceptionally?.cause
+ val processed = when (exceptionally) {
+ is Cancelled -> _channel.cancel(cause) // producer coroutine was cancelled -- cancel channel
+ else -> _channel.close(cause) // producer coroutine has completed -- close channel
+ }
+ if (!processed && cause != null)
+ handleCoroutineException(context, cause)
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
new file mode 100644
index 0000000..689e36f
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+/**
+ * Rendezvous channel. This channel does not have any buffer at all. An element is transferred from sender
+ * to receiver only when [send] and [receive] invocations meet in time (rendezvous), so [send] suspends
+ * until another coroutine invokes [receive] and [receive] suspends until another coroutine invokes [send].
+ *
+ * Use `Channel()` factory function to conveniently create an instance of rendezvous channel.
+ *
+ * This implementation is fully lock-free.
+ */
+public open class RendezvousChannel<E> : AbstractChannel<E>() {
+ protected final override val isBufferAlwaysEmpty: Boolean get() = true
+ protected final override val isBufferEmpty: Boolean get() = true
+ protected final override val isBufferAlwaysFull: Boolean get() = true
+ protected final override val isBufferFull: Boolean get() = true
+}
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.common.kt
new file mode 100644
index 0000000..fe20163
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.common.kt
@@ -0,0 +1,6 @@
+package kotlinx.coroutines.experimental.internal
+
+/**
+ * Cross-platform array copy. Overlaps of source and destination are not supported
+ */
+expect fun <E> arraycopy(source: Array<E>, srcPos: Int, destination: Array<E?>, destinationStart: Int, length: Int)
\ No newline at end of file
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.common.kt
new file mode 100644
index 0000000..2b1f504
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.common.kt
@@ -0,0 +1,5 @@
+package kotlinx.coroutines.experimental.internal
+
+expect interface Closeable {
+ fun close()
+}
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.common.kt
new file mode 100644
index 0000000..df36305
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.common.kt
@@ -0,0 +1,18 @@
+package kotlinx.coroutines.experimental.internal
+
+/**
+ * Special kind of list intended to be used as collection of subscribers in [ArrayBroadcastChannel]
+ * On JVM it's CopyOnWriteList and on JS it's MutableList.
+ *
+ * Note that this alias is intentionally not named as CopyOnWriteList to avoid accidental misusage outside of ArrayBroadcastChannel
+ */
+typealias SubscribersList<E> = MutableList<E>
+
+expect fun <E> subscriberList(): SubscribersList<E>
+
+expect class ReentrantLock() {
+ fun tryLock(): Boolean
+ fun unlock(): Unit
+}
+
+expect inline fun <T> ReentrantLock.withLock(action: () -> T): T
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.common.kt
index 9cc3e14..e23fd98 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.common.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.common.kt
@@ -16,12 +16,12 @@
package kotlinx.coroutines.experimental.internal
-import kotlin.jvm.*
-
/** @suppress **This is unstable API and it is subject to change.** */
public expect open class LockFreeLinkedListNode() {
public val isRemoved: Boolean
+ public val next: Any
public val nextNode: LockFreeLinkedListNode
+ public val prev: Any
public val prevNode: LockFreeLinkedListNode
public fun addLast(node: LockFreeLinkedListNode)
public fun addOneIfEmpty(node: LockFreeLinkedListNode): Boolean
@@ -57,11 +57,23 @@
val queue: LockFreeLinkedListNode
val node: T
protected override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any?
+ override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
+}
+
+public expect open class RemoveFirstDesc<T>(queue: LockFreeLinkedListNode): AbstractAtomicDesc {
+ val queue: LockFreeLinkedListNode
+ public val result: T
+ protected open fun validatePrepared(node: T): Boolean
+ protected final override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any?
+ final override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
}
/** @suppress **This is unstable API and it is subject to change.** */
public expect abstract class AbstractAtomicDesc : AtomicDesc {
- protected abstract fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any?
final override fun prepare(op: AtomicOp<*>): Any?
final override fun complete(op: AtomicOp<*>, failure: Any?)
+ protected open fun failure(affected: LockFreeLinkedListNode, next: Any): Any?
+ protected open fun retry(affected: LockFreeLinkedListNode, next: Any): Boolean
+ protected abstract fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? // non-null on failure
+ protected abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt
new file mode 100644
index 0000000..84da40f
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt
@@ -0,0 +1,179 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class ArrayBroadcastChannelTest : TestBase() {
+
+ @Test
+ fun testBasic() = runTest {
+ expect(1)
+ val broadcast = ArrayBroadcastChannel<Int>(1)
+ assertFalse(broadcast.isClosedForSend)
+ val first = broadcast.openSubscription()
+ launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+ expect(2)
+ assertEquals(1, first.receive()) // suspends
+ assertFalse(first.isClosedForReceive)
+ expect(5)
+ assertEquals(2, first.receive()) // suspends
+ assertFalse(first.isClosedForReceive)
+ expect(10)
+ assertNull(first.receiveOrNull()) // suspends
+ assertTrue(first.isClosedForReceive)
+ expect(14)
+ }
+ expect(3)
+ broadcast.send(1)
+ expect(4)
+ yield() // to the first receiver
+ expect(6)
+
+ val second = broadcast.openSubscription()
+ launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+ expect(7)
+ assertEquals(2, second.receive()) // suspends
+ assertFalse(second.isClosedForReceive)
+ expect(11)
+ assertNull(second.receiveOrNull()) // suspends
+ assertTrue(second.isClosedForReceive)
+ expect(15)
+ }
+ expect(8)
+ broadcast.send(2)
+ expect(9)
+ yield() // to first & second receivers
+ expect(12)
+ broadcast.close()
+ expect(13)
+ assertTrue(broadcast.isClosedForSend)
+ yield() // to first & second receivers
+ finish(16)
+ }
+
+ @Test
+ fun testSendSuspend() = runTest {
+ expect(1)
+ val broadcast = ArrayBroadcastChannel<Int>(1)
+ val first = broadcast.openSubscription()
+ launch(coroutineContext) {
+ expect(4)
+ assertEquals(1, first.receive())
+ expect(5)
+ assertEquals(2, first.receive())
+ expect(6)
+ }
+ expect(2)
+ broadcast.send(1) // puts to buffer, receiver not running yet
+ expect(3)
+ broadcast.send(2) // suspends
+ finish(7)
+ }
+
+ @Test
+ fun testConcurrentSendCompletion() = runTest {
+ expect(1)
+ val broadcast = ArrayBroadcastChannel<Int>(1)
+ val sub = broadcast.openSubscription()
+ // launch 3 concurrent senders (one goes buffer, two other suspend)
+ for (x in 1..3) {
+ launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+ expect(x + 1)
+ broadcast.send(x)
+ }
+ }
+ // and close it for send
+ expect(5)
+ broadcast.close()
+ // now must receive all 3 items
+ expect(6)
+ assertFalse(sub.isClosedForReceive)
+ for (x in 1..3)
+ assertEquals(x, sub.receiveOrNull())
+ // and receive close signal
+ assertNull(sub.receiveOrNull())
+ assertTrue(sub.isClosedForReceive)
+ finish(7)
+ }
+
+ @Test
+ fun testForgetUnsubscribed() = runTest {
+ expect(1)
+ val broadcast = ArrayBroadcastChannel<Int>(1)
+ broadcast.send(1)
+ broadcast.send(2)
+ broadcast.send(3)
+ expect(2) // should not suspend anywhere above
+ val sub = broadcast.openSubscription()
+ launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+ expect(3)
+ assertEquals(4, sub.receive()) // suspends
+ expect(5)
+ }
+ expect(4)
+ broadcast.send(4) // sends
+ yield()
+ finish(6)
+ }
+
+ @Test
+ fun testReceiveFullAfterClose() = runTest {
+ val channel = BroadcastChannel<Int>(10)
+ val sub = channel.openSubscription()
+ // generate into buffer & close
+ for (x in 1..5) channel.send(x)
+ channel.close()
+ // make sure all of them are consumed
+ check(!sub.isClosedForReceive)
+ for (x in 1..5) check(sub.receive() == x)
+ check(sub.receiveOrNull() == null)
+ check(sub.isClosedForReceive)
+ }
+
+ @Test
+ fun testCloseSubDuringIteration() = runTest {
+ val channel = BroadcastChannel<Int>(1)
+ // launch generator (for later) in this context
+ launch(coroutineContext) {
+ for (x in 1..5) channel.send(x)
+ channel.close()
+ }
+ // start consuming
+ val sub = channel.openSubscription()
+ var expected = 0
+ sub.consumeEach {
+ check(it == ++expected)
+ if (it == 2) {
+ sub.close()
+ }
+ }
+ check(expected == 2)
+ }
+
+ @Test
+ fun testReceiveFromClosedSub() = runTest({ it is ClosedReceiveChannelException }) {
+ val channel = BroadcastChannel<Int>(1)
+ val sub = channel.openSubscription()
+ assertFalse(sub.isClosedForReceive)
+ sub.close()
+ assertTrue(sub.isClosedForReceive)
+ sub.receive()
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt
new file mode 100644
index 0000000..61fdaef
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class ArrayChannelTest : TestBase() {
+
+ @Test
+ fun testSimple() = runTest {
+ val q = ArrayChannel<Int>(1)
+ check(q.isEmpty && !q.isFull)
+ expect(1)
+ val sender = launch(coroutineContext) {
+ expect(4)
+ q.send(1) // success -- buffered
+ check(!q.isEmpty && q.isFull)
+ expect(5)
+ q.send(2) // suspends (buffer full)
+ expect(9)
+ }
+ expect(2)
+ val receiver = launch(coroutineContext) {
+ expect(6)
+ check(q.receive() == 1) // does not suspend -- took from buffer
+ check(!q.isEmpty && q.isFull) // waiting sender's element moved to buffer
+ expect(7)
+ check(q.receive() == 2) // does not suspend (takes from sender)
+ expect(8)
+ }
+ expect(3)
+ sender.join()
+ receiver.join()
+ check(q.isEmpty && !q.isFull)
+ finish(10)
+ }
+
+ @Test
+ fun testClosedBufferedReceiveOrNull() = runTest {
+ val q = ArrayChannel<Int>(1)
+ check(q.isEmpty && !q.isFull && !q.isClosedForSend && !q.isClosedForReceive)
+ expect(1)
+ launch(coroutineContext) {
+ expect(5)
+ check(!q.isEmpty && !q.isFull && q.isClosedForSend && !q.isClosedForReceive)
+ assertEquals(42, q.receiveOrNull())
+ expect(6)
+ check(!q.isEmpty && !q.isFull && q.isClosedForSend && q.isClosedForReceive)
+ assertEquals(null, q.receiveOrNull())
+ expect(7)
+ }
+ expect(2)
+ q.send(42) // buffers
+ expect(3)
+ q.close() // goes on
+ expect(4)
+ check(!q.isEmpty && !q.isFull && q.isClosedForSend && !q.isClosedForReceive)
+ yield()
+ check(!q.isEmpty && !q.isFull && q.isClosedForSend && q.isClosedForReceive)
+ finish(8)
+ }
+
+ @Test
+ fun testClosedExceptions() = runTest {
+ val q = ArrayChannel<Int>(1)
+ expect(1)
+ launch(coroutineContext) {
+ expect(4)
+ try { q.receive() }
+ catch (e: ClosedReceiveChannelException) {
+ expect(5)
+ }
+ }
+ expect(2)
+
+ require(q.close())
+ expect(3)
+ yield()
+ expect(6)
+ try { q.send(42) }
+ catch (e: ClosedSendChannelException) {
+ finish(7)
+ }
+ }
+
+ @Test
+ fun testOfferAndPool() = runTest {
+ val q = ArrayChannel<Int>(1)
+ assertTrue(q.offer(1))
+ expect(1)
+ launch(coroutineContext) {
+ expect(3)
+ assertEquals(1, q.poll())
+ expect(4)
+ assertEquals(null, q.poll())
+ expect(5)
+ assertEquals(2, q.receive()) // suspends
+ expect(9)
+ assertEquals(3, q.poll())
+ expect(10)
+ assertEquals(null, q.poll())
+ expect(11)
+ }
+ expect(2)
+ yield()
+ expect(6)
+ assertTrue(q.offer(2))
+ expect(7)
+ assertTrue(q.offer(3))
+ expect(8)
+ assertFalse(q.offer(4))
+ yield()
+ finish(12)
+ }
+
+ @Test
+ fun testConsumeAll() = runTest {
+ val q = ArrayChannel<Int>(5)
+ for (i in 1..10) {
+ if (i <= 5) {
+ expect(i)
+ q.send(i) // shall buffer
+ } else {
+ launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+ expect(i)
+ q.send(i) // suspends
+ expectUnreached() // will get cancelled by cancel
+ }
+ }
+ }
+ expect(11)
+ q.cancel()
+ check(q.isClosedForSend)
+ check(q.isClosedForReceive)
+ check(q.receiveOrNull() == null)
+ finish(12)
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelFactoryTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelFactoryTest.kt
new file mode 100644
index 0000000..2bbc4a1
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelFactoryTest.kt
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlin.test.*
+
+
+class BroadcastChannelFactoryTest {
+
+ @Test
+ fun testRendezvousChannelNotSupported() {
+ assertFailsWith<IllegalArgumentException> { BroadcastChannel<Int>(0) }
+ }
+
+ @Test
+ fun testLinkedListChannelNotSupported() {
+ assertFailsWith<IllegalArgumentException> { BroadcastChannel<Int>(Channel.UNLIMITED) }
+ }
+
+ @Test
+ fun testConflatedBroadcastChannel() {
+ assertTrue { BroadcastChannel<Int>(Channel.CONFLATED) is ConflatedBroadcastChannel }
+ }
+
+ @Test
+ fun testArrayBroadcastChannel() {
+ assertTrue { BroadcastChannel<Int>(1) is ArrayBroadcastChannel }
+ assertTrue { BroadcastChannel<Int>(10) is ArrayBroadcastChannel }
+ }
+
+ @Test
+ fun testInvalidCapacityNotSupported() {
+ assertFailsWith<IllegalArgumentException> { BroadcastChannel<Int>(-2) }
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelFactoryTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelFactoryTest.kt
new file mode 100644
index 0000000..d4c5126
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelFactoryTest.kt
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.test.*
+
+
+class ChannelFactoryTest : TestBase() {
+
+ @Test
+ fun testRendezvousChannel() {
+ assertTrue(Channel<Int>() is RendezvousChannel)
+ assertTrue(Channel<Int>(0) is RendezvousChannel)
+ }
+
+ @Test
+ fun testLinkedListChannel() {
+ assertTrue(Channel<Int>(Channel.UNLIMITED) is LinkedListChannel)
+ }
+
+ @Test
+ fun testConflatedChannel() {
+ assertTrue(Channel<Int>(Channel.CONFLATED) is ConflatedChannel)
+ }
+
+ @Test
+ fun testArrayChannel() {
+ assertTrue(Channel<Int>(1) is ArrayChannel)
+ assertTrue(Channel<Int>(10) is ArrayChannel)
+ }
+
+ @Test
+ fun testInvalidCapacityNotSupported() = runTest({ it is IllegalArgumentException }) {
+ Channel<Int>(-2)
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt
new file mode 100644
index 0000000..0fe1fc9
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt
@@ -0,0 +1,569 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.math.*
+import kotlin.test.*
+
+class ChannelsTest: TestBase() {
+ private val testList = listOf(1, 2, 3)
+
+ @Test
+ fun testIterableAsReceiveChannel() = runTest {
+ assertEquals(testList, testList.asReceiveChannel().toList())
+ }
+
+ @Test
+ fun testSequenceAsReceiveChannel() = runTest {
+ assertEquals(testList, testList.asSequence().asReceiveChannel().toList())
+ }
+
+ @Test
+ fun testAssociate() = runTest {
+ assertEquals(testList.associate { it * 2 to it * 3 },
+ testList.asReceiveChannel().associate { it * 2 to it * 3 }.toMap())
+ }
+
+ @Test
+ fun testAssociateBy() = runTest {
+ assertEquals(testList.associateBy { it % 2 }, testList.asReceiveChannel().associateBy { it % 2 })
+ }
+
+ @Test
+ fun testAssociateBy2() = runTest {
+ assertEquals(testList.associateBy({ it * 2}, { it * 3 }),
+ testList.asReceiveChannel().associateBy({ it * 2}, { it * 3 }).toMap())
+ }
+
+ @Test
+ fun testDistinct() = runTest {
+ assertEquals(testList.map { it % 2 }.distinct(), testList.asReceiveChannel().map { it % 2 }.distinct().toList())
+ }
+
+ @Test
+ fun testDistinctBy() = runTest {
+ assertEquals(testList.distinctBy { it % 2 }.toList(), testList.asReceiveChannel().distinctBy { it % 2 }.toList())
+ }
+
+ @Test
+ fun testToCollection() = runTest {
+ val target = mutableListOf<Int>()
+ testList.asReceiveChannel().toCollection(target)
+ assertEquals(testList, target)
+ }
+
+ @Test
+ fun testDrop() = runTest {
+ for (i in 0..testList.size) {
+ assertEquals(testList.drop(i), testList.asReceiveChannel().drop(i).toList(), "Drop $i")
+ }
+ }
+
+ @Test
+ fun testElementAtOrElse() = runTest {
+ assertEquals(testList.elementAtOrElse(2) { 42 }, testList.asReceiveChannel().elementAtOrElse(2) { 42 })
+ assertEquals(testList.elementAtOrElse(9) { 42 }, testList.asReceiveChannel().elementAtOrElse(9) { 42 })
+ }
+
+ @Test
+ fun testFirst() = runTest {
+ assertEquals(testList.first(), testList.asReceiveChannel().first())
+ for (i in testList) {
+ assertEquals(testList.first { it == i }, testList.asReceiveChannel().first { it == i })
+ }
+ try {
+ testList.asReceiveChannel().first { it == 9 }
+ fail()
+ } catch (nse: NoSuchElementException) {
+ }
+ }
+
+ @Test
+ fun testFirstOrNull() = runTest {
+ assertEquals(testList.firstOrNull(), testList.asReceiveChannel().firstOrNull())
+ assertEquals(testList.firstOrNull { it == 2 }, testList.asReceiveChannel().firstOrNull { it == 2 })
+ assertEquals(testList.firstOrNull { it == 9 }, testList.asReceiveChannel().firstOrNull { it == 9 })
+ }
+
+ @Test
+ fun testFlatMap() = runTest {
+ assertEquals(testList.flatMap { (0..it).toList() }, testList.asReceiveChannel().flatMap { (0..it).asReceiveChannel() }.toList())
+
+ }
+
+ @Test
+ fun testFold() = runTest {
+ assertEquals(testList.fold(mutableListOf(42)) { acc, e -> acc.apply { add(e) } },
+ testList.asReceiveChannel().fold(mutableListOf(42)) { acc, e -> acc.apply { add(e) } }.toList())
+ }
+
+ @Test
+ fun testFoldIndexed() = runTest {
+ assertEquals(testList.foldIndexed(mutableListOf(42)) { index, acc, e -> acc.apply { add(index + e) } },
+ testList.asReceiveChannel().foldIndexed(mutableListOf(42)) { index, acc, e -> acc.apply { add(index + e) } }.toList())
+ }
+
+ @Test
+ fun testGroupBy() = runTest {
+ assertEquals(testList.groupBy { it % 2 }, testList.asReceiveChannel().groupBy { it % 2 })
+ }
+
+ @Test
+ fun testGroupBy2() = runTest {
+ assertEquals(testList.groupBy({ -it }, { it + 100 }), testList.asReceiveChannel().groupBy({ -it }, { it + 100 }).toMap())
+
+ }
+
+ @Test
+ fun testMap() = runTest {
+ assertEquals(testList.map { it + 10 }, testList.asReceiveChannel().map { it + 10 }.toList())
+
+ }
+
+ @Test
+ fun testMapToCollection() = runTest {
+ val c = mutableListOf<Int>()
+ testList.asReceiveChannel().mapTo(c) { it + 10 }
+ assertEquals(testList.map { it + 10 }, c)
+ }
+
+ @Test
+ fun testMapToSendChannel() = runTest {
+ val c = produce<Int> {
+ testList.asReceiveChannel().mapTo(channel) { it + 10 }
+ }
+ assertEquals(testList.map { it + 10 }, c.toList())
+ }
+
+ @Test
+ fun testEmptyList() = runTest {
+ assertTrue(emptyList<Nothing>().asReceiveChannel().toList().isEmpty())
+ }
+
+ @Test
+ fun testToList() = runTest {
+ assertEquals(testList, testList.asReceiveChannel().toList())
+
+ }
+
+ @Test
+ fun testEmptySet() = runTest {
+ assertTrue(emptyList<Nothing>().asReceiveChannel().toSet().isEmpty())
+
+ }
+
+ @Test
+ fun testToSet() = runTest {
+ assertEquals(testList.toSet(), testList.asReceiveChannel().toSet())
+ }
+
+ @Test
+ fun testToMutableSet() = runTest {
+ assertEquals(testList.toMutableSet(), testList.asReceiveChannel().toMutableSet())
+ }
+
+ @Test
+ fun testEmptySequence() = runTest {
+ val channel = Channel<Nothing>()
+ channel.close()
+
+ assertTrue(emptyList<Nothing>().asReceiveChannel().count() == 0)
+ }
+
+ @Test
+ fun testEmptyMap() = runTest {
+ val channel = Channel<Pair<Nothing, Nothing>>()
+ channel.close()
+
+ assertTrue(channel.toMap().isEmpty())
+ }
+
+ @Test
+ fun testToMap() = runTest {
+ val values = testList.map { it to it.toString() }
+ assertEquals(values.toMap(), values.asReceiveChannel().toMap())
+ }
+
+ @Test
+ fun testReduce() = runTest {
+ assertEquals(testList.reduce { acc, e -> acc * e },
+ testList.asReceiveChannel().reduce { acc, e -> acc * e })
+ }
+
+ @Test
+ fun testReduceIndexed() = runTest {
+ assertEquals(testList.reduceIndexed { index, acc, e -> index + acc * e },
+ testList.asReceiveChannel().reduceIndexed { index, acc, e -> index + acc * e })
+ }
+
+ @Test
+ fun testTake() = runTest {
+ for (i in 0..testList.size) {
+ assertEquals(testList.take(i), testList.asReceiveChannel().take(i).toList())
+ }
+ }
+
+ @Test
+ fun testPartition() = runTest {
+ assertEquals(testList.partition { it % 2 == 0 }, testList.asReceiveChannel().partition { it % 2 == 0 })
+ }
+
+ @Test
+ fun testZip() = runTest {
+ val other = listOf("a", "b")
+ assertEquals(testList.zip(other), testList.asReceiveChannel().zip(other.asReceiveChannel()).toList())
+ }
+
+ @Test
+ fun testElementAt() = runTest {
+ testList.indices.forEach { i ->
+ assertEquals(testList[i], testList.asReceiveChannel().elementAt(i))
+ }
+ }
+
+ @Test
+ fun testElementAtOrNull() = runTest {
+ testList.indices.forEach { i ->
+ assertEquals(testList[i], testList.asReceiveChannel().elementAtOrNull(i))
+ }
+ assertEquals(null, testList.asReceiveChannel().elementAtOrNull(-1))
+ assertEquals(null, testList.asReceiveChannel().elementAtOrNull(testList.size))
+ }
+
+ @Test
+ fun testFind() = runTest {
+ repeat(3) { mod ->
+ assertEquals(testList.find { it % 2 == mod },
+ testList.asReceiveChannel().find { it % 2 == mod })
+ }
+ }
+
+ @Test
+ fun testFindLast() = runTest {
+ repeat(3) { mod ->
+ assertEquals(testList.findLast { it % 2 == mod }, testList.asReceiveChannel().findLast { it % 2 == mod })
+ }
+ }
+
+ @Test
+ fun testIndexOf() = runTest {
+ repeat(testList.size + 1) { i ->
+ assertEquals(testList.indexOf(i), testList.asReceiveChannel().indexOf(i))
+ }
+ }
+
+ @Test
+ fun testLastIndexOf() = runTest {
+ repeat(testList.size + 1) { i ->
+ assertEquals(testList.lastIndexOf(i), testList.asReceiveChannel().lastIndexOf(i))
+ }
+ }
+
+ @Test
+ fun testIndexOfFirst() = runTest {
+ repeat(3) { mod ->
+ assertEquals(testList.indexOfFirst { it % 2 == mod },
+ testList.asReceiveChannel().indexOfFirst { it % 2 == mod })
+ }
+ }
+
+ @Test
+ fun testIndexOfLast() = runTest {
+ repeat(3) { mod ->
+ assertEquals(testList.indexOfLast { it % 2 != mod },
+ testList.asReceiveChannel().indexOfLast { it % 2 != mod })
+ }
+ }
+
+ @Test
+ fun testLastOrNull() = runTest {
+ assertEquals(testList.lastOrNull(), testList.asReceiveChannel().lastOrNull())
+ assertEquals(null, emptyList<Int>().asReceiveChannel().lastOrNull())
+ }
+
+ @Test
+ fun testSingleOrNull() = runTest {
+ assertEquals(1, listOf(1).asReceiveChannel().singleOrNull())
+ assertEquals(null, listOf(1, 2).asReceiveChannel().singleOrNull())
+ assertEquals(null, emptyList<Int>().asReceiveChannel().singleOrNull())
+ repeat(testList.size + 1) { i ->
+ assertEquals(testList.singleOrNull { it == i },
+ testList.asReceiveChannel().singleOrNull { it == i })
+ }
+ repeat(3) { mod ->
+ assertEquals(testList.singleOrNull { it % 2 == mod },
+ testList.asReceiveChannel().singleOrNull { it % 2 == mod })
+ }
+ }
+
+ @Test
+ fun testDropWhile() = runTest {
+ repeat(3) { mod ->
+ assertEquals(testList.dropWhile { it % 2 == mod },
+ testList.asReceiveChannel().dropWhile { it % 2 == mod }.toList())
+ }
+ }
+
+ @Test
+ fun testFilter() = runTest {
+ repeat(3) { mod ->
+ assertEquals(testList.filter { it % 2 == mod },
+ testList.asReceiveChannel().filter { it % 2 == mod }.toList())
+ }
+ }
+
+ @Test
+ fun testFilterToCollection() = runTest {
+ repeat(3) { mod ->
+ val c = mutableListOf<Int>()
+ testList.asReceiveChannel().filterTo(c) { it % 2 == mod }
+ assertEquals(testList.filter { it % 2 == mod }, c)
+ }
+ }
+
+ @Test
+ fun testFilterToSendChannel() = runTest {
+ repeat(3) { mod ->
+ val c = produce<Int> {
+ testList.asReceiveChannel().filterTo(channel) { it % 2 == mod }
+ }
+ assertEquals(testList.filter { it % 2 == mod }, c.toList())
+ }
+ }
+
+ @Test
+ fun testFilterNot() = runTest {
+ repeat(3) { mod ->
+ assertEquals(testList.filterNot { it % 2 == mod },
+ testList.asReceiveChannel().filterNot { it % 2 == mod }.toList())
+ }
+ }
+
+ @Test
+ fun testFilterNotToCollection() = runTest {
+ repeat(3) { mod ->
+ val c = mutableListOf<Int>()
+ testList.asReceiveChannel().filterNotTo(c) { it % 2 == mod }
+ assertEquals(testList.filterNot { it % 2 == mod }, c)
+ }
+ }
+
+ @Test
+ fun testFilterNotToSendChannel() = runTest {
+ repeat(3) { mod ->
+ val c = produce<Int> {
+ testList.asReceiveChannel().filterNotTo(channel) { it % 2 == mod }
+ }
+ assertEquals(testList.filterNot { it % 2 == mod }, c.toList())
+ }
+ }
+
+ @Test
+ fun testFilterNotNull() = runTest {
+ repeat(3) { mod ->
+ assertEquals(testList.map { it.takeIf { it % 2 == mod } }.filterNotNull(),
+ testList.asReceiveChannel().map { it.takeIf { it % 2 == mod } }.filterNotNull().toList())
+ }
+ }
+
+ @Test
+ fun testFilterNotNullToCollection() = runTest {
+ repeat(3) { mod ->
+ val c = mutableListOf<Int>()
+ testList.asReceiveChannel().map { it.takeIf { it % 2 == mod } }.filterNotNullTo(c)
+ assertEquals(testList.map { it.takeIf { it % 2 == mod } }.filterNotNull(), c)
+ }
+ }
+
+ @Test
+ fun testFilterNotNullToSendChannel() = runTest {
+ repeat(3) { mod ->
+ val c = produce<Int> {
+ testList.asReceiveChannel().map { it.takeIf { it % 2 == mod } }.filterNotNullTo(channel)
+ }
+ assertEquals(testList.map { it.takeIf { it % 2 == mod } }.filterNotNull(), c.toList())
+ }
+ }
+
+ @Test
+ fun testFilterIndexed() = runTest {
+ repeat(3) { mod ->
+ assertEquals(testList.filterIndexed { index, _ -> index % 2 == mod },
+ testList.asReceiveChannel().filterIndexed { index, _ -> index % 2 == mod }.toList())
+ }
+ }
+
+ @Test
+ fun testFilterIndexedToCollection() = runTest {
+ repeat(3) { mod ->
+ val c = mutableListOf<Int>()
+ testList.asReceiveChannel().filterIndexedTo(c) { index, _ -> index % 2 == mod }
+ assertEquals(testList.filterIndexed { index, _ -> index % 2 == mod }, c)
+ }
+ }
+
+ @Test
+ fun testFilterIndexedToChannel() = runTest {
+ repeat(3) { mod ->
+ val c = produce<Int> {
+ testList.asReceiveChannel().filterIndexedTo(channel) { index, _ -> index % 2 == mod }
+ }
+ assertEquals(testList.filterIndexed { index, _ -> index % 2 == mod }, c.toList())
+ }
+ }
+
+ @Test
+ fun testTakeWhile() = runTest {
+ repeat(3) { mod ->
+ assertEquals(testList.takeWhile { it % 2 != mod },
+ testList.asReceiveChannel().takeWhile { it % 2 != mod }.toList())
+ }
+ }
+
+ @Test
+ fun testToChannel() = runTest {
+ val c = produce<Int> {
+ testList.asReceiveChannel().toChannel(channel)
+ }
+ assertEquals(testList, c.toList())
+ }
+
+ @Test
+ fun testMapIndexed() = runTest {
+ assertEquals(testList.mapIndexed { index, i -> index + i },
+ testList.asReceiveChannel().mapIndexed { index, i -> index + i }.toList())
+ }
+
+ @Test
+ fun testMapIndexedToCollection() = runTest {
+ val c = mutableListOf<Int>()
+ testList.asReceiveChannel().mapIndexedTo(c) { index, i -> index + i }
+ assertEquals(testList.mapIndexed { index, i -> index + i }, c)
+ }
+
+ @Test
+ fun testMapIndexedToSendChannel() = runTest {
+ val c = produce<Int> {
+ testList.asReceiveChannel().mapIndexedTo(channel) { index, i -> index + i }
+ }
+ assertEquals(testList.mapIndexed { index, i -> index + i }, c.toList())
+ }
+
+ @Test
+ fun testMapNotNull() = runTest {
+ repeat(3) { mod ->
+ assertEquals(testList.mapNotNull { i -> i.takeIf { i % 2 == mod } },
+ testList.asReceiveChannel().mapNotNull { i -> i.takeIf { i % 2 == mod } }.toList())
+ }
+ }
+
+ @Test
+ fun testMapNotNullToCollection() = runTest {
+ repeat(3) { mod ->
+ val c = mutableListOf<Int>()
+ testList.asReceiveChannel().mapNotNullTo(c) { i -> i.takeIf { i % 2 == mod } }
+ assertEquals(testList.mapNotNull { i -> i.takeIf { i % 2 == mod } }, c)
+ }
+ }
+
+ @Test
+ fun testMapNotNullToSendChannel() = runTest {
+ repeat(3) { mod ->
+ val c = produce<Int> {
+ testList.asReceiveChannel().mapNotNullTo(channel) { i -> i.takeIf { i % 2 == mod } }
+ }
+ assertEquals(testList.mapNotNull { i -> i.takeIf { i % 2 == mod } }, c.toList())
+ }
+ }
+
+ @Test
+ fun testMapIndexedNotNull() = runTest {
+ repeat(3) { mod ->
+ assertEquals(testList.mapIndexedNotNull { index, i -> index.takeIf { i % 2 == mod } },
+ testList.asReceiveChannel().mapIndexedNotNull { index, i -> index.takeIf { i % 2 == mod } }.toList())
+ }
+ }
+
+ @Test
+ fun testMapIndexedNotNullToCollection() = runTest {
+ repeat(3) { mod ->
+ val c = mutableListOf<Int>()
+ testList.asReceiveChannel().mapIndexedNotNullTo(c) { index, i -> index.takeIf { i % 2 == mod } }
+ assertEquals(testList.mapIndexedNotNull { index, i -> index.takeIf { i % 2 == mod } }, c)
+ }
+ }
+
+ @Test
+ fun testMapIndexedNotNullToSendChannel() = runTest {
+ repeat(3) { mod ->
+ val c = produce<Int> {
+ testList.asReceiveChannel().mapIndexedNotNullTo(channel) { index, i -> index.takeIf { i % 2 == mod } }
+ }
+ assertEquals(testList.mapIndexedNotNull { index, i -> index.takeIf { i % 2 == mod } }, c.toList())
+ }
+ }
+
+ @Test
+ fun testWithIndex() = runTest {
+ assertEquals(testList.withIndex().toList(), testList.asReceiveChannel().withIndex().toList())
+ }
+
+ @Test
+ fun testMaxBy() = runTest {
+ assertEquals(testList.maxBy { 10 - abs(it - 2) },
+ testList.asReceiveChannel().maxBy { 10 - abs(it - 2) })
+ }
+
+ @Test
+ fun testMaxWith() = runTest {
+ val cmp = compareBy<Int> { 10 - abs(it - 2) }
+ assertEquals(testList.maxWith(cmp),
+ testList.asReceiveChannel().maxWith(cmp))
+ }
+
+ @Test
+ fun testMinBy() = runTest {
+ assertEquals(testList.minBy { abs(it - 2) },
+ testList.asReceiveChannel().minBy { abs(it - 2) })
+ }
+
+ @Test
+ fun testMinWith() = runTest {
+ val cmp = compareBy<Int> { abs(it - 2) }
+ assertEquals(testList.minWith(cmp),
+ testList.asReceiveChannel().minWith(cmp))
+ }
+
+ @Test
+ fun testSumBy() = runTest {
+ assertEquals(testList.sumBy { it * 3 },
+ testList.asReceiveChannel().sumBy { it * 3 })
+ }
+
+ @Test
+ fun testSumByDouble() = runTest {
+ val expected = testList.sumByDouble { it * 3.0 }
+ val actual = testList.asReceiveChannel().sumByDouble { it * 3.0 }
+ assertEquals(expected, actual)
+ }
+
+ @Test
+ fun testRequireNoNulls() = runTest {
+ assertEquals(testList.requireNoNulls(), testList.asReceiveChannel().requireNoNulls().toList())
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt
new file mode 100644
index 0000000..4c04f8f
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class ConflatedBroadcastChannelTest : TestBase() {
+
+ @Test
+ fun testBasicScenario() = runTest {
+ expect(1)
+ val broadcast = ConflatedBroadcastChannel<String>()
+ assertTrue(exceptionFromNotInline { broadcast.value } is IllegalStateException)
+ assertNull(broadcast.valueOrNull)
+
+ launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+ expect(2)
+ val sub = broadcast.openSubscription()
+ assertNull(sub.poll())
+ expect(3)
+ assertEquals("one", sub.receive()) // suspends
+ expect(6)
+ assertEquals("two", sub.receive()) // suspends
+ expect(12)
+ sub.close()
+ expect(13)
+ }
+
+ expect(4)
+ broadcast.send("one") // does not suspend
+ assertEquals("one", broadcast.value)
+ assertEquals("one", broadcast.valueOrNull)
+ expect(5)
+ yield() // to receiver
+ expect(7)
+ launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+ expect(8)
+ val sub = broadcast.openSubscription()
+ assertEquals("one", sub.receive()) // does not suspend
+ expect(9)
+ assertEquals("two", sub.receive()) // suspends
+ expect(14)
+ assertEquals("three", sub.receive()) // suspends
+ expect(17)
+ assertNull(sub.receiveOrNull()) // suspends until closed
+ expect(20)
+ sub.close()
+ expect(21)
+ }
+
+ expect(10)
+ broadcast.send("two") // does not suspend
+ assertEquals("two", broadcast.value)
+ assertEquals("two", broadcast.valueOrNull)
+ expect(11)
+ yield() // to both receivers
+ expect(15)
+ broadcast.send("three") // does not suspend
+ assertEquals("three", broadcast.value)
+ assertEquals("three", broadcast.valueOrNull)
+ expect(16)
+ yield() // to second receiver
+ expect(18)
+ broadcast.close()
+ assertTrue(exceptionFromNotInline { broadcast.value } is IllegalStateException)
+ assertNull(broadcast.valueOrNull)
+ expect(19)
+ yield() // to second receiver
+ assertTrue(exceptionFrom { broadcast.send("four") } is ClosedSendChannelException)
+ finish(22)
+ }
+
+ @Test
+ fun testInitialValueAndReceiveClosed() = runTest {
+ expect(1)
+ val broadcast = ConflatedBroadcastChannel(1)
+ assertEquals(1, broadcast.value)
+ assertEquals(1, broadcast.valueOrNull)
+ launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+ expect(2)
+ val sub = broadcast.openSubscription()
+ assertEquals(1, sub.receive())
+ expect(3)
+ assertTrue(exceptionFrom { sub.receive() } is ClosedReceiveChannelException) // suspends
+ expect(6)
+ }
+ expect(4)
+ broadcast.close()
+ expect(5)
+ yield() // to child
+ finish(7)
+ }
+
+ inline fun exceptionFrom(block: () -> Unit): Throwable? {
+ try {
+ block()
+ return null
+ } catch (e: Throwable) {
+ return e
+ }
+ }
+
+ // Ugly workaround for bug in JS compiler
+ fun exceptionFromNotInline(block: () -> Unit): Throwable? {
+ try {
+ block()
+ return null
+ } catch (e: Throwable) {
+ return e
+ }
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt
new file mode 100644
index 0000000..1fd7413
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class ConflatedChannelTest : TestBase() {
+
+ @Test
+ fun testBasicConflationOfferPoll() {
+ val q = ConflatedChannel<Int>()
+ assertNull(q.poll())
+ assertTrue(q.offer(1))
+ assertTrue(q.offer(2))
+ assertTrue(q.offer(3))
+ assertEquals(3, q.poll())
+ assertNull(q.poll())
+ }
+
+ @Test
+ fun testConflatedSend() = runTest {
+ val q = ConflatedChannel<Int>()
+ q.send(1)
+ q.send(2) // shall conflated previously sent
+ assertEquals(2, q.receiveOrNull())
+ }
+
+ @Test
+ fun testConflatedClose() = runTest {
+ val q = ConflatedChannel<Int>()
+ q.send(1)
+ q.close() // shall conflate sent item and become closed
+ assertNull(q.receiveOrNull())
+ }
+
+ @Test
+ fun testConflationSendReceive() = runTest {
+ val q = ConflatedChannel<Int>()
+ expect(1)
+ launch(coroutineContext) { // receiver coroutine
+ expect(4)
+ assertEquals(2, q.receive())
+ expect(5)
+ assertEquals(3, q.receive()) // this receive suspends
+ expect(8)
+ assertEquals(6, q.receive()) // last conflated value
+ expect(9)
+ }
+ expect(2)
+ q.send(1)
+ q.send(2) // shall conflate
+ expect(3)
+ yield() // to receiver
+ expect(6)
+ q.send(3) // send to the waiting receiver
+ q.send(4) // buffer
+ q.send(5) // conflate
+ q.send(6) // conflate again
+ expect(7)
+ yield() // to receiver
+ finish(10)
+ }
+
+ @Test
+ fun testConsumeAll() = runTest {
+ val q = ConflatedChannel<Int>()
+ expect(1)
+ for (i in 1..10) {
+ q.send(i) // stores as last
+ }
+ q.cancel()
+ check(q.isClosedForSend)
+ check(q.isClosedForReceive)
+ check(q.receiveOrNull() == null)
+ finish(2)
+ }
+}
\ No newline at end of file
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt
new file mode 100644
index 0000000..897801e
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.TestBase
+import kotlin.test.*
+
+class LinkedListChannelTest : TestBase() {
+
+ @Test
+ fun testBasic() = runTest {
+ val c = LinkedListChannel<Int>()
+ c.send(1)
+ check(c.offer(2))
+ c.send(3)
+ check(c.close())
+ check(!c.close())
+ assertEquals(1, c.receive())
+ assertEquals(2, c.poll())
+ assertEquals(3, c.receiveOrNull())
+ assertNull(c.receiveOrNull())
+ }
+
+ @Test
+ fun testConsumeAll() = runTest {
+ val q = LinkedListChannel<Int>()
+ for (i in 1..10) {
+ q.send(i) // buffers
+ }
+ q.cancel()
+ check(q.isClosedForSend)
+ check(q.isClosedForReceive)
+ check(q.receiveOrNull() == null)
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeTest.kt
new file mode 100644
index 0000000..6646aa6
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeTest.kt
@@ -0,0 +1,55 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class ProduceConsumeTest : TestBase() {
+
+ @Test
+ fun testRendezvous() = runTest {
+ testProducer(1)
+ }
+
+ @Test
+ fun testSmallBuffer() = runTest {
+ testProducer(1)
+ }
+
+ @Test
+ fun testMediumBuffer() = runTest {
+ testProducer(10)
+ }
+
+ @Test
+ fun testLargeMediumBuffer() = runTest {
+ testProducer(1000)
+ }
+
+ @Test
+ fun testUnlimited() = runTest {
+ testProducer(Channel.UNLIMITED)
+ }
+
+ private suspend fun testProducer(producerCapacity: Int) {
+ testProducer(1, producerCapacity)
+ testProducer(10, producerCapacity)
+ testProducer(100, producerCapacity)
+ }
+
+ private suspend fun testProducer(messages: Int, producerCapacity: Int) {
+ var sentAll = false
+ val producer = produce(coroutineContext, capacity = producerCapacity) {
+ for (i in 1..messages) {
+ send(i)
+ }
+ sentAll = true
+ }
+ var consumed = 0
+ for (x in producer) {
+ consumed++
+ }
+ assertTrue(sentAll)
+ assertEquals(messages, consumed)
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt
new file mode 100644
index 0000000..522f6d6
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class ProduceTest : TestBase() {
+
+ @Test
+ fun testBasic() = runTest {
+ val c = produce(coroutineContext) {
+ expect(2)
+ send(1)
+ expect(3)
+ send(2)
+ expect(6)
+ }
+ expect(1)
+ check(c.receive() == 1)
+ expect(4)
+ check(c.receive() == 2)
+ expect(5)
+ check(c.receiveOrNull() == null)
+ finish(7)
+ }
+
+ @Test
+ fun testCancel() = runTest {
+ val c = produce(coroutineContext) {
+ expect(2)
+ send(1)
+ expect(3)
+ try {
+ send(2) // will get cancelled
+ } catch (e: Throwable) {
+ finish(7)
+ check(e is JobCancellationException && e.job == coroutineContext[Job])
+ throw e
+ }
+ expectUnreached()
+ }
+ expect(1)
+ check(c.receive() == 1)
+ expect(4)
+ c.cancel()
+ expect(5)
+ check(c.receiveOrNull() == null)
+ expect(6)
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
new file mode 100644
index 0000000..6e1b2c3
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
@@ -0,0 +1,292 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class RendezvousChannelTest : TestBase() {
+
+ @Test
+ fun testSimple() = runTest {
+ val q = RendezvousChannel<Int>()
+ check(q.isEmpty && q.isFull)
+ expect(1)
+ val sender = launch(coroutineContext) {
+ expect(4)
+ q.send(1) // suspend -- the first to come to rendezvous
+ expect(7)
+ q.send(2) // does not suspend -- receiver is there
+ expect(8)
+ }
+ expect(2)
+ val receiver = launch(coroutineContext) {
+ expect(5)
+ check(q.receive() == 1) // does not suspend -- sender was there
+ expect(6)
+ check(q.receive() == 2) // suspends
+ expect(9)
+ }
+ expect(3)
+ sender.join()
+ receiver.join()
+ check(q.isEmpty && q.isFull)
+ finish(10)
+ }
+
+ @Test
+ fun testClosedReceiveOrNull() = runTest {
+ val q = RendezvousChannel<Int>()
+ check(q.isEmpty && q.isFull && !q.isClosedForSend && !q.isClosedForReceive)
+ expect(1)
+ launch(coroutineContext) {
+ expect(3)
+ assertEquals(42, q.receiveOrNull())
+ expect(4)
+ assertEquals(null, q.receiveOrNull())
+ expect(6)
+ }
+ expect(2)
+ q.send(42)
+ expect(5)
+ q.close()
+ check(!q.isEmpty && !q.isFull && q.isClosedForSend && q.isClosedForReceive)
+ yield()
+ check(!q.isEmpty && !q.isFull && q.isClosedForSend && q.isClosedForReceive)
+ finish(7)
+ }
+
+ @Test
+ fun testClosedExceptions() = runTest {
+ val q = RendezvousChannel<Int>()
+ expect(1)
+ launch(coroutineContext) {
+ expect(4)
+ try { q.receive() }
+ catch (e: ClosedReceiveChannelException) {
+ expect(5)
+ }
+ }
+ expect(2)
+ q.close()
+ expect(3)
+ yield()
+ expect(6)
+ try { q.send(42) }
+ catch (e: ClosedSendChannelException) {
+ finish(7)
+ }
+ }
+
+ @Test
+ fun testOfferAndPool() = runTest {
+ val q = RendezvousChannel<Int>()
+ assertFalse(q.offer(1))
+ expect(1)
+ launch(coroutineContext) {
+ expect(3)
+ assertEquals(null, q.poll())
+ expect(4)
+ assertEquals(2, q.receive())
+ expect(7)
+ assertEquals(null, q.poll())
+ yield()
+ expect(9)
+ assertEquals(3, q.poll())
+ expect(10)
+ }
+ expect(2)
+ yield()
+ expect(5)
+ assertTrue(q.offer(2))
+ expect(6)
+ yield()
+ expect(8)
+ q.send(3)
+ finish(11)
+ }
+
+ @Test
+ fun testIteratorClosed() = runTest {
+ val q = RendezvousChannel<Int>()
+ expect(1)
+ launch(coroutineContext) {
+ expect(3)
+ q.close()
+ expect(4)
+ }
+ expect(2)
+ for (x in q) {
+ expectUnreached()
+ }
+ finish(5)
+ }
+
+ @Test
+ fun testIteratorOne() = runTest {
+ val q = RendezvousChannel<Int>()
+ expect(1)
+ launch(coroutineContext) {
+ expect(3)
+ q.send(1)
+ expect(4)
+ q.close()
+ expect(5)
+ }
+ expect(2)
+ for (x in q) {
+ expect(6)
+ assertEquals(1, x)
+ }
+ finish(7)
+ }
+
+ @Test
+ fun testIteratorOneWithYield() = runTest {
+ val q = RendezvousChannel<Int>()
+ expect(1)
+ launch(coroutineContext) {
+ expect(3)
+ q.send(1) // will suspend
+ expect(6)
+ q.close()
+ expect(7)
+ }
+ expect(2)
+ yield() // yield to sender coroutine right before starting for loop
+ expect(4)
+ for (x in q) {
+ expect(5)
+ assertEquals(1, x)
+ }
+ finish(8)
+ }
+
+ @Test
+ fun testIteratorTwo() = runTest {
+ val q = RendezvousChannel<Int>()
+ expect(1)
+ launch(coroutineContext) {
+ expect(3)
+ q.send(1)
+ expect(4)
+ q.send(2)
+ expect(7)
+ q.close()
+ expect(8)
+ }
+ expect(2)
+ for (x in q) {
+ when (x) {
+ 1 -> expect(5)
+ 2 -> expect(6)
+ else -> expectUnreached()
+ }
+ }
+ finish(9)
+ }
+
+ @Test
+ fun testIteratorTwoWithYield() = runTest {
+ val q = RendezvousChannel<Int>()
+ expect(1)
+ launch(coroutineContext) {
+ expect(3)
+ q.send(1) // will suspend
+ expect(6)
+ q.send(2)
+ expect(7)
+ q.close()
+ expect(8)
+ }
+ expect(2)
+ yield() // yield to sender coroutine right before starting for loop
+ expect(4)
+ for (x in q) {
+ when (x) {
+ 1 -> expect(5)
+ 2 -> expect(9)
+ else -> expectUnreached()
+ }
+ }
+ finish(10)
+ }
+
+ @Test
+ fun testSuspendSendOnClosedChannel() = runTest {
+ val q = RendezvousChannel<Int>()
+ expect(1)
+ launch(coroutineContext) {
+ expect(4)
+ q.send(42) // suspend
+ expect(11)
+ }
+ expect(2)
+ launch(coroutineContext) {
+ expect(5)
+ q.close()
+ expect(6)
+ }
+ expect(3)
+ yield() // to sender
+ expect(7)
+ yield() // try to resume sender (it will not resume despite the close!)
+ expect(8)
+ assertEquals(42, q.receiveOrNull())
+ expect(9)
+ assertNull(q.receiveOrNull())
+ expect(10)
+ yield() // to sender, it was resumed!
+ finish(12)
+ }
+
+ class BadClass {
+ override fun equals(other: Any?): Boolean = error("equals")
+ override fun hashCode(): Int = error("hashCode")
+ override fun toString(): String = error("toString")
+ }
+
+ @Test
+ fun testProduceBadClass() = runTest {
+ val bad = BadClass()
+ val c = produce(coroutineContext) {
+ expect(1)
+ send(bad)
+ }
+ assertTrue(c.receive() === bad)
+ finish(2)
+ }
+
+ @Test
+ fun testConsumeAll() = runTest {
+ val q = RendezvousChannel<Int>()
+ for (i in 1..10) {
+ launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+ expect(i)
+ q.send(i) // suspends
+ expectUnreached() // will get cancelled by cancel
+ }
+ }
+ expect(11)
+ q.cancel()
+ check(q.isClosedForSend)
+ check(q.isClosedForReceive)
+ check(q.receiveOrNull() == null)
+ finish(12)
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SendReceiveStressTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SendReceiveStressTest.kt
new file mode 100644
index 0000000..c85f541
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SendReceiveStressTest.kt
@@ -0,0 +1,46 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class SendReceiveStressTest : TestBase() {
+
+ // Emulate parametrized by hand :(
+
+ @Test
+ fun testArrayChannel() = runTest {
+ testStress(ArrayChannel(2))
+ }
+
+ @Test
+ fun testLinkedListChannel() = runTest {
+ testStress(LinkedListChannel())
+ }
+
+ @Test
+ fun testRendezvousChannel() = runTest {
+ testStress(RendezvousChannel())
+ }
+
+ private suspend fun testStress(channel: Channel<Int>) {
+ val n = 1_000 // Do not increase, otherwise node.js will fail with timeout :(
+ val sender = launch(coroutineContext) {
+ for (i in 1..n) {
+ channel.send(i)
+ }
+ expect(2)
+ }
+ val receiver = launch(coroutineContext) {
+ for (i in 1..n) {
+ val next = channel.receive()
+ check(next == i)
+ }
+ expect(3)
+ }
+ expect(1)
+ sender.join()
+ receiver.join()
+ finish(4)
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt
new file mode 100644
index 0000000..69e939f
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt
@@ -0,0 +1,35 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class SimpleSendReceiveTest : TestBase() {
+
+ @Test
+ fun testSimpleSendReceive() = runTest {
+ // Parametrized common test :(
+ TestChannelKind.values().forEach { kind -> testSendReceive(kind, 100) }
+ }
+
+ private suspend fun testSendReceive(kind: TestChannelKind, iterations: Int) {
+ val channel = kind.create()
+
+ launch(coroutineContext) {
+ repeat(iterations) { channel.send(it) }
+ channel.close()
+ }
+ var expected = 0
+ for (x in channel) {
+ if (!kind.isConflated) {
+ assertEquals(expected++, x)
+ } else {
+ assertTrue(x >= expected)
+ expected = x + 1
+ }
+ }
+ if (!kind.isConflated) {
+ assertEquals(iterations, expected)
+ }
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestBroadcastChannelKind.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestBroadcastChannelKind.kt
new file mode 100644
index 0000000..60dbb97
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestBroadcastChannelKind.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+enum class TestBroadcastChannelKind {
+ ARRAY_1 {
+ override fun <T> create(): BroadcastChannel<T> = ArrayBroadcastChannel(1)
+ override fun toString(): String = "ArrayBroadcastChannel(1)"
+ },
+ ARRAY_10 {
+ override fun <T> create(): BroadcastChannel<T> = ArrayBroadcastChannel(10)
+ override fun toString(): String = "ArrayBroadcastChannel(10)"
+ },
+ CONFLATED {
+ override fun <T> create(): BroadcastChannel<T> = ConflatedBroadcastChannel()
+ override fun toString(): String = "ConflatedBroadcastChannel"
+ override val isConflated: Boolean get() = true
+ }
+ ;
+
+ abstract fun <T> create(): BroadcastChannel<T>
+ open val isConflated: Boolean get() = false
+}
\ No newline at end of file
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
new file mode 100644
index 0000000..c3ac904
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.selects.SelectClause1
+
+enum class TestChannelKind {
+ RENDEZVOUS {
+ override fun create(): Channel<Int> = RendezvousChannel()
+ override fun toString(): String = "RendezvousChannel"
+ },
+ ARRAY_1 {
+ override fun create(): Channel<Int> = ArrayChannel(1)
+ override fun toString(): String = "ArrayChannel(1)"
+ },
+ ARRAY_10 {
+ override fun create(): Channel<Int> = ArrayChannel(8)
+ override fun toString(): String = "ArrayChannel(8)"
+ },
+ LINKED_LIST {
+ override fun create(): Channel<Int> = LinkedListChannel()
+ override fun toString(): String = "LinkedListChannel"
+ },
+ CONFLATED {
+ override fun create(): Channel<Int> = ConflatedChannel()
+ override fun toString(): String = "ConflatedChannel"
+ override val isConflated: Boolean get() = true
+ },
+ ARRAY_BROADCAST_1 {
+ override fun create(): Channel<Int> = ChannelViaBroadcast(ArrayBroadcastChannel<Int>(1))
+ override fun toString(): String = "ArrayBroadcastChannel(1)"
+ },
+ ARRAY_BROADCAST_10 {
+ override fun create(): Channel<Int> = ChannelViaBroadcast(ArrayBroadcastChannel<Int>(10))
+ override fun toString(): String = "ArrayBroadcastChannel(10)"
+ },
+ CONFLATED_BROADCAST {
+ override fun create(): Channel<Int> = ChannelViaBroadcast(ConflatedBroadcastChannel<Int>())
+ override fun toString(): String = "ConflatedBroadcastChannel"
+ override val isConflated: Boolean get() = true
+ }
+ ;
+
+ abstract fun create(): Channel<Int>
+ open val isConflated: Boolean get() = false
+}
+
+private class ChannelViaBroadcast<E>(
+ private val broadcast: BroadcastChannel<E>
+): Channel<E>, SendChannel<E> by broadcast {
+ val sub = broadcast.openSubscription()
+
+ override val isClosedForReceive: Boolean get() = sub.isClosedForReceive
+ override val isEmpty: Boolean get() = sub.isEmpty
+
+ // Workaround for KT-23094
+ override suspend fun send(element: E) = broadcast.send(element)
+
+ override suspend fun receive(): E = sub.receive()
+ override suspend fun receiveOrNull(): E? = sub.receiveOrNull()
+ override fun poll(): E? = sub.poll()
+ override fun iterator(): ChannelIterator<E> = sub.iterator()
+ override fun cancel(cause: Throwable?): Boolean = sub.cancel(cause)
+ override val onReceive: SelectClause1<E>
+ get() = sub.onReceive
+ override val onReceiveOrNull: SelectClause1<E?>
+ get() = sub.onReceiveOrNull
+}