Introduced ReceiveChannel.cancel method;
all operators on ReceiveChannel fully consume the original channel
using a helper consume extension, which is reflected in docs;
removed `suspend` modifier from intermediate channel operators;
consistently renamed channel type param to <E>;
added two versions for xxxTo fun -- with MutableList & SendChannel;
added tests for all channel operators;
dropped/deprecated ActorJob/ProducerJob, fixes #127
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
index 17f93c9..2f4c841 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
@@ -898,7 +898,7 @@
 
     protected open val hasCancellingState: Boolean get() = false
 
-    public final override fun cancel(cause: Throwable?): Boolean =
+    public override fun cancel(cause: Throwable?): Boolean =
         if (hasCancellingState)
             makeCancelling(cause) else
             makeCancelled(cause)
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
index 529e638..367a3b2 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
@@ -398,6 +398,11 @@
             remove()
         }
 
+        override fun resumeSendClosed(closed: Closed<*>) {
+            if (select.trySelect(null))
+                select.resumeSelectCancellableWithException(closed.sendException)
+        }
+
         override fun toString(): String = "SendSelect($pollResult)[$channel, $select]"
     }
 
@@ -407,6 +412,7 @@
         override val pollResult: Any? get() = element
         override fun tryResumeSend(idempotent: Any?): Any? = SEND_RESUMED
         override fun completeResumeSend(token: Any) { check(token === SEND_RESUMED) }
+        override fun resumeSendClosed(closed: Closed<*>) {}
     }
 }
 
@@ -567,6 +573,24 @@
         return if (result === POLL_FAILED) null else receiveOrNullResult(result)
     }
 
+    override fun cancel(cause: Throwable?): Boolean =
+        close(cause).also {
+            cleanupSendQueueOnCancel()
+        }
+
+    // Note: this function is invoked when channel is already closed
+    protected open fun cleanupSendQueueOnCancel() {
+        val closed = closedForSend ?: error("Cannot happen")
+        while (true) {
+            val send = takeFirstSendOrPeekClosed() ?: error("Cannot happen")
+            if (send is Closed<*>) {
+                check(send === closed)
+                return // cleaned
+            }
+            send.resumeSendClosed(closed)
+        }
+    }
+
     public final override fun iterator(): ChannelIterator<E> = Itr(this)
 
     // ------ registerSelectReceive ------
@@ -909,6 +933,7 @@
     val pollResult: Any? // E | Closed
     fun tryResumeSend(idempotent: Any?): Any?
     fun completeResumeSend(token: Any)
+    fun resumeSendClosed(closed: Closed<*>)
 }
 
 /**
@@ -922,7 +947,7 @@
 }
 
 /**
- * Represents closed channel.
+ * Represents sender for a specific element.
  * @suppress **This is unstable API and it is subject to change.**
  */
 @Suppress("UNCHECKED_CAST")
@@ -932,6 +957,7 @@
 ) : LockFreeLinkedListNode(), Send {
     override fun tryResumeSend(idempotent: Any?): Any? = cont.tryResume(Unit, idempotent)
     override fun completeResumeSend(token: Any) = cont.completeResume(token)
+    override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
     override fun toString(): String = "SendElement($pollResult)[$cont]"
 }
 
@@ -951,6 +977,7 @@
     override fun completeResumeSend(token: Any) { check(token === CLOSE_RESUMED) }
     override fun tryResumeReceive(value: E, idempotent: Any?): Any? = CLOSE_RESUMED
     override fun completeResumeReceive(token: Any) { check(token === CLOSE_RESUMED) }
+    override fun resumeSendClosed(closed: Closed<*>) = error("Should be never invoked")
     override fun toString(): String = "Closed[$closeCause]"
 }
 
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Actor.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Actor.kt
index 5fc7390..e0a72fa 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Actor.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Actor.kt
@@ -37,14 +37,11 @@
 }
 
 /**
- * Return type for [actor] coroutine builder.
+ * @suppress **Deprecated**: Use `SendChannel`.
  */
-public interface ActorJob<in E> : Job, SendChannel<E> {
-    /**
-     * A reference to the mailbox channel that this coroutine is receiving messages from.
-     * All the [SendChannel] functions on this interface delegate to
-     * the channel instance returned by this function.
-     */
+@Deprecated(message = "Use `SendChannel`", replaceWith = ReplaceWith("SendChannel"))
+interface ActorJob<in E> : SendChannel<E> {
+    @Deprecated(message = "Use SendChannel itself")
     val channel: SendChannel<E>
 }
 
@@ -87,7 +84,7 @@
     capacity: Int = 0,
     start: CoroutineStart = CoroutineStart.DEFAULT,
     block: suspend ActorScope<E>.() -> Unit
-): ActorJob<E> {
+): SendChannel<E> {
     val newContext = newCoroutineContext(context)
     val channel = Channel<E>(capacity)
     val coroutine = if (start.isLazy)
@@ -109,8 +106,6 @@
     channel: Channel<E>,
     private val block: suspend ActorScope<E>.() -> Unit
 ) : ActorCoroutine<E>(parentContext, channel, active = false), SelectClause2<E, SendChannel<E>> {
-    override val channel: Channel<E> get() = this
-
     override fun onStart() {
         block.startCoroutineCancellable(this, this)
     }
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt
index 34e1db7..909337f 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt
@@ -209,10 +209,10 @@
         override val isBufferAlwaysFull: Boolean get() = error("Should not be used")
         override val isBufferFull: Boolean get() = error("Should not be used")
 
-        override fun close() {
-            if (close(cause = null))
-                broadcastChannel.updateHead(removeSub = this)
-        }
+        override fun cancel(cause: Throwable?): Boolean =
+            close(cause).also { closed ->
+                if (closed) broadcastChannel.updateHead(removeSub = this)
+            }
 
         // returns true if subHead was updated and broadcast channel's head must be checked
         // this method is lock-free (it never waits on lock)
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
index 4d2c5c3..8ce9008 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
@@ -231,4 +231,18 @@
             send!!.completeResumeSend(token!!)
         return result
     }
+
+    // Note: this function is invoked when channel is already closed
+    override fun cleanupSendQueueOnCancel() {
+        // clear buffer first
+        lock.withLock {
+            repeat(size) {
+                buffer[head] = 0
+                head = (head + 1) % capacity
+            }
+            size = 0
+        }
+        // then clean all queued senders
+        super.cleanupSendQueueOnCancel()
+    }
 }
\ No newline at end of file
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt
index 0555139..e071aa2 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt
@@ -76,10 +76,12 @@
 /**
  * Return type for [BroadcastChannel.openSubscription] that can be used to [receive] elements from the
  * open subscription and to [close] it to unsubscribe.
+ *
+ * Note, that invocation of [cancel] also closes subscription.
  */
 public interface SubscriptionReceiveChannel<out T> : ReceiveChannel<T>, Closeable {
     /**
-     * Closes this subscription.
+     * Closes this subscription. This is a synonym for [cancel].
      */
-    public override fun close()
+    public override fun close() { cancel() }
 }
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
index e21acd6..912f87e 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
@@ -87,7 +87,9 @@
     /**
      * Closes this channel with an optional exceptional [cause].
      * This is an idempotent operation -- repeated invocations of this function have no effect and return `false`.
-     * Conceptually, its sends a special "close token" over this channel. Immediately after invocation of this function
+     * Conceptually, its sends a special "close token" over this channel.
+     *
+     * Immediately after invocation of this function
      * [isClosedForSend] starts returning `true`. However, [isClosedForReceive][ReceiveChannel.isClosedForReceive]
      * on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements
      * are received.
@@ -192,6 +194,21 @@
      * throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
      */
     public operator fun iterator(): ChannelIterator<E>
+
+    /**
+     * Cancels reception of remaining elements from this channel. This function closes the channel with
+     * the specified cause (unless it was already closed) and removes all buffered sent elements from it.
+     * This function returns `true` if the channel was not closed previously, or `false` otherwise.
+     *
+     * Immediately after invocation of this function [isClosedForReceive] and
+     * [isClosedForSend][SendChannel.isClosedForSend]
+     * on the side of [SendChannel] start returning `true`, so all attempts to send to this channel
+     * afterwards will throw [ClosedSendChannelException], while attempts to receive will throw
+     * [ClosedReceiveChannelException] if it was cancelled without a cause.
+     * A channel that was cancelled with non-null [cause] is called a _failed_ channel. Attempts to send or
+     * receive on a failed channel throw the specified [cause] exception.
+     */
+    public fun cancel(cause: Throwable? = null): Boolean
 }
 
 /**
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt
index a13da23..bbaa09b 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt
@@ -17,18 +17,22 @@
 package kotlinx.coroutines.experimental.channels
 
 import kotlinx.coroutines.experimental.AbstractCoroutine
-import kotlinx.coroutines.experimental.JobSupport
 import kotlinx.coroutines.experimental.handleCoroutineException
 import kotlin.coroutines.experimental.CoroutineContext
 
 internal open class ChannelCoroutine<E>(
     parentContext: CoroutineContext,
-    open val channel: Channel<E>,
+    channel: Channel<E>,
     active: Boolean
 ) : AbstractCoroutine<Unit>(parentContext, active), Channel<E> by channel {
-    override fun afterCompletion(state: Any?, mode: Int) {
-        val cause = (state as? JobSupport.CompletedExceptionally)?.cause
-        if (!channel.close(cause) && cause != null)
+    val channel: Channel<E>
+        get() = this
+
+    override fun onCancellation(exceptionally: CompletedExceptionally?) {
+        val cause = exceptionally?.cause
+        if (!close(cause) && cause != null)
             handleCoroutineException(context, cause)
     }
+
+    override fun cancel(cause: Throwable?): Boolean = super.cancel(cause)
 }
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt
index 3ff1d7d..390a80d 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt
@@ -16,1261 +16,13 @@
 
 package kotlinx.coroutines.experimental.channels
 
-import kotlinx.coroutines.experimental.CommonPool
 import kotlinx.coroutines.experimental.Unconfined
 import kotlinx.coroutines.experimental.runBlocking
 import kotlin.coroutines.experimental.CoroutineContext
 
 internal const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"
 
-/**
- * Creates a [ProducerJob] to read all element of the [Iterable].
- */
-public fun <E> Iterable<E>.asReceiveChannel(context: CoroutineContext = Unconfined): ReceiveChannel<E> = produce(context) {
-    for (element in this@asReceiveChannel)
-        send(element)
-}
-
-/**
- * Creates an [ActorJob] to insert elements in this [MutableCollection].
- */
-public fun <E> MutableCollection<E>.asSendChannel(context: CoroutineContext = Unconfined): SendChannel<E> = actor(context) {
-    for (element in channel)
-        this@asSendChannel += element
-}
-
-/**
- * Creates a [Sequence] instance that wraps the original [ReceiveChannel] returning its entries when being emitted.
- */
-public fun <E : Any> ReceiveChannel<E>.asSequence(): Sequence<E> =
-        generateSequence {
-            runBlocking {
-                receiveOrNull()
-            }
-        }
-
-/**
- * Performs the given [action] for each received element.
- */
-public inline suspend fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit) {
-    for (element in this) action(element)
-}
-
-/**
- * @suppress: **Deprecated**: binary compatibility with old code
- */
-@Deprecated("binary compatibility with old code", level = DeprecationLevel.HIDDEN)
-public suspend fun <E> ReceiveChannel<E>.consumeEach(action: suspend (E) -> Unit) =
-    consumeEach { action(it) }
-
-/**
- * Subscribes to this [BroadcastChannel] and performs the specified action for each received element.
- */
-public inline suspend fun <E> BroadcastChannel<E>.consumeEach(action: (E) -> Unit) {
-    openSubscription().use { channel ->
-        for (x in channel) action(x)
-    }
-}
-
-/**
- * @suppress: **Deprecated**: binary compatibility with old code
- */
-@Deprecated("binary compatibility with old code", level = DeprecationLevel.HIDDEN)
-public suspend fun <E> BroadcastChannel<E>.consumeEach(action: suspend (E) -> Unit) =
-    consumeEach { action(it) }
-
-/**
- * Performs the given [action] for each received element.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <E> ReceiveChannel<E>.consumeEachIndexed(action: suspend (IndexedValue<E>) -> Unit) {
-    var index = 0
-    for (element in this) action(IndexedValue(index++, element))
-}
-
-/**
- * Removes at least minElements and at most maxElements from this channel and adds them to the given destination collection.
- */
-public suspend fun <T : Any?, E : T> ReceiveChannel<E>.drainTo(destination: MutableCollection<T>,
-                                                               minElements: Int = 0,
-                                                               maxElements: Int = Integer.MAX_VALUE) {
-    require(minElements >= 0) { "minElements cannot be negative" }
-    require(maxElements >= minElements) { "maxElements cannot be lesser than minElements" }
-    repeat(minElements) {
-        destination += receive()
-    }
-    repeat(maxElements - minElements) {
-        destination += poll() ?: return
-    }
-}
-
-/**
- * Returns an element at the given [index] or throws an [IndexOutOfBoundsException] if the [index] is out of bounds of this channel.
- *
- * The operation is _terminal_.
- */
-public suspend fun <T> ReceiveChannel<T>.elementAt(index: Int): T {
-    return elementAtOrElse(index) { throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.") }
-}
-
-/**
- * Returns an element at the given [index] or the result of calling the [defaultValue] function if the [index] is out of bounds of this channel.
- *
- * The operation is _terminal_.
- */
-public suspend fun <T> ReceiveChannel<T>.elementAtOrElse(index: Int, defaultValue: suspend (Int) -> T): T {
-    if (index < 0)
-        return defaultValue(index)
-    var count = 0
-    for (element in this) {
-        if (index == count++)
-            return element
-    }
-    return defaultValue(index)
-}
-
-/**
- * Returns an element at the given [index] or `null` if the [index] is out of bounds of this channel.
- *
- * The operation is _terminal_.
- */
-public suspend fun <T> ReceiveChannel<T>.elementAtOrNull(index: Int): T? {
-    if (index < 0)
-        return null
-    var count = 0
-    for (element in this) {
-        if (index == count++)
-            return element
-    }
-    return null
-}
-
-/**
- * Returns the first element matching the given [predicate], or `null` if no such element was found.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T> ReceiveChannel<T>.find(predicate: suspend (T) -> Boolean): T? {
-    return firstOrNull(predicate)
-}
-
-/**
- * Returns the last element matching the given [predicate], or `null` if no such element was found.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T> ReceiveChannel<T>.findLast(predicate: suspend (T) -> Boolean): T? {
-    return lastOrNull(predicate)
-}
-
-/**
- * Returns first element.
- * @throws [NoSuchElementException] if the channel is empty.
- *
- * The operation is _terminal_.
- */
-public suspend fun <T> ReceiveChannel<T>.first(): T {
-    val iterator = iterator()
-    if (!iterator.hasNext())
-        throw NoSuchElementException("ReceiveChannel is empty.")
-    return iterator.next()
-}
-
-/**
- * Returns the first element matching the given [predicate].
- * @throws [NoSuchElementException] if no such element is found.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T> ReceiveChannel<T>.first(predicate: suspend (T) -> Boolean): T {
-    for (element in this) if (predicate(element)) return element
-    throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.")
-}
-
-/**
- * Returns the first element, or `null` if the channel is empty.
- *
- * The operation is _terminal_.
- */
-public suspend fun <T> ReceiveChannel<T>.firstOrNull(): T? {
-    val iterator = iterator()
-    if (!iterator.hasNext())
-        return null
-    return iterator.next()
-}
-
-/**
- * Returns the first element matching the given [predicate], or `null` if element was not found.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T> ReceiveChannel<T>.firstOrNull(predicate: suspend (T) -> Boolean): T? {
-    for (element in this) if (predicate(element)) return element
-    return null
-}
-
-/**
- * Returns first index of [element], or -1 if the channel does not contain element.
- *
- * The operation is _terminal_.
- */
-public suspend fun <T> ReceiveChannel<T>.indexOf(element: T): Int {
-    var index = 0
-    for (item in this) {
-        if (element == item)
-            return index
-        index++
-    }
-    return -1
-}
-
-/**
- * Returns index of the first element matching the given [predicate], or -1 if the channel does not contain such element.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T> ReceiveChannel<T>.indexOfFirst(predicate: suspend (T) -> Boolean): Int {
-    var index = 0
-    for (item in this) {
-        if (predicate(item))
-            return index
-        index++
-    }
-    return -1
-}
-
-/**
- * Returns index of the last element matching the given [predicate], or -1 if the channel does not contain such element.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T> ReceiveChannel<T>.indexOfLast(predicate: suspend (T) -> Boolean): Int {
-    var lastIndex = -1
-    var index = 0
-    for (item in this) {
-        if (predicate(item))
-            lastIndex = index
-        index++
-    }
-    return lastIndex
-}
-
-/**
- * Returns the last element.
- * @throws [NoSuchElementException] if the channel is empty.
- *
- * The operation is _terminal_.
- */
-public suspend fun <T> ReceiveChannel<T>.last(): T {
-    val iterator = iterator()
-    if (!iterator.hasNext())
-        throw NoSuchElementException("ReceiveChannel is empty.")
-    var last = iterator.next()
-    while (iterator.hasNext())
-        last = iterator.next()
-    return last
-}
-
-/**
- * Returns the last element matching the given [predicate].
- * @throws [NoSuchElementException] if no such element is found.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T> ReceiveChannel<T>.last(predicate: suspend (T) -> Boolean): T {
-    var last: T? = null
-    var found = false
-    for (element in this) {
-        if (predicate(element)) {
-            last = element
-            found = true
-        }
-    }
-    if (!found) throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.")
-    @Suppress("UNCHECKED_CAST")
-    return last as T
-}
-
-/**
- * Returns last index of [element], or -1 if the channel does not contain element.
- *
- * The operation is _terminal_.
- */
-public suspend fun <T> ReceiveChannel<T>.lastIndexOf(element: T): Int {
-    var lastIndex = -1
-    var index = 0
-    for (item in this) {
-        if (element == item)
-            lastIndex = index
-        index++
-    }
-    return lastIndex
-}
-
-/**
- * Returns the last element, or `null` if the channel is empty.
- *
- * The operation is _terminal_.
- */
-public suspend fun <T> ReceiveChannel<T>.lastOrNull(): T? {
-    val iterator = iterator()
-    if (!iterator.hasNext())
-        return null
-    var last = iterator.next()
-    while (iterator.hasNext())
-        last = iterator.next()
-    return last
-}
-
-/**
- * Returns the last element matching the given [predicate], or `null` if no such element was found.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T> ReceiveChannel<T>.lastOrNull(predicate: suspend (T) -> Boolean): T? {
-    var last: T? = null
-    for (element in this) {
-        if (predicate(element)) {
-            last = element
-        }
-    }
-    return last
-}
-
-/**
- * Returns the single element, or throws an exception if the channel is empty or has more than one element.
- *
- * The operation is _terminal_.
- */
-public suspend fun <T> ReceiveChannel<T>.single(): T {
-    val iterator = iterator()
-    if (!iterator.hasNext())
-        throw NoSuchElementException("ReceiveChannel is empty.")
-    val single = iterator.next()
-    if (iterator.hasNext())
-        throw IllegalArgumentException("ReceiveChannel has more than one element.")
-    return single
-}
-
-/**
- * Returns the single element matching the given [predicate], or throws exception if there is no or more than one matching element.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T> ReceiveChannel<T>.single(predicate: suspend (T) -> Boolean): T {
-    var single: T? = null
-    var found = false
-    for (element in this) {
-        if (predicate(element)) {
-            if (found) throw IllegalArgumentException("ReceiveChannel contains more than one matching element.")
-            single = element
-            found = true
-        }
-    }
-    if (!found) throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.")
-    @Suppress("UNCHECKED_CAST")
-    return single as T
-}
-
-/**
- * Returns single element, or `null` if the channel is empty or has more than one element.
- *
- * The operation is _terminal_.
- */
-public suspend fun <T> ReceiveChannel<T>.singleOrNull(): T? {
-    val iterator = iterator()
-    if (!iterator.hasNext())
-        return null
-    val single = iterator.next()
-    if (iterator.hasNext())
-        return null
-    return single
-}
-
-/**
- * Returns the single element matching the given [predicate], or `null` if element was not found or more than one element was found.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T> ReceiveChannel<T>.singleOrNull(predicate: suspend (T) -> Boolean): T? {
-    var single: T? = null
-    var found = false
-    for (element in this) {
-        if (predicate(element)) {
-            if (found) return null
-            single = element
-            found = true
-        }
-    }
-    if (!found) return null
-    return single
-}
-
-/**
- * Returns a channel containing all elements except first [n] elements.
- *
- * The operation is _intermediate_ and _stateless_.
- */
-public suspend fun <T> ReceiveChannel<T>.drop(n: Int): ReceiveChannel<T> = produce(CommonPool) {
-    require(n >= 0) { "Requested element count $n is less than zero." }
-    var remaining: Int = n
-    if (remaining > 0)
-        for (element in this@drop) {
-            remaining--
-            if (remaining == 0)
-                break
-        }
-    for (element in this@drop) {
-        send(element)
-    }
-}
-
-/**
- * Returns a channel containing all elements except first elements that satisfy the given [predicate].
- *
- * The operation is _intermediate_ and _stateless_.
- */
-public suspend fun <T> ReceiveChannel<T>.dropWhile(predicate: suspend (T) -> Boolean): ReceiveChannel<T> = produce(Unconfined) {
-    for (element in this@dropWhile) {
-        if (!predicate(element))
-            break
-    }
-    for (element in this@dropWhile) {
-        send(element)
-    }
-}
-
-/**
- * Returns a channel containing only elements matching the given [predicate].
- *
- * The operation is _intermediate_ and _stateless_.
- */
-public suspend fun <T> ReceiveChannel<T>.filter(predicate: suspend (T) -> Boolean): ReceiveChannel<T> = produce(Unconfined) {
-    for (element in this@filter) {
-        if (predicate(element))
-            send(element)
-    }
-}
-
-/**
- * Returns a channel containing only elements matching the given [predicate].
- * @param [predicate] function that takes the index of an element and the element itself
- * and returns the result of predicate evaluation on the element.
- *
- * The operation is _intermediate_ and _stateless_.
- */
-public suspend fun <T> ReceiveChannel<T>.filterIndexed(predicate: suspend (index: Int, T) -> Boolean): ReceiveChannel<T> = produce(Unconfined) {
-    var index = 0
-    for (element in this@filterIndexed) {
-        if (predicate(index++, element))
-            send(element)
-    }
-}
-
-/**
- * Appends all elements matching the given [predicate] to the given [destination].
- * @param [predicate] function that takes the index of an element and the element itself
- * and returns the result of predicate evaluation on the element.
- *
- * The operation is _terminal_.
- */
-public suspend fun <T, C : MutableCollection<in T>> ReceiveChannel<T>.filterIndexedTo(destination: C, predicate: suspend (index: Int, T) -> Boolean): C {
-    consumeEachIndexed { (index, element) ->
-        if (predicate(index, element)) destination.add(element)
-    }
-    return destination
-}
-
-/**
- * Returns a channel containing all elements not matching the given [predicate].
- *
- * The operation is _intermediate_ and _stateless_.
- */
-public suspend fun <T> ReceiveChannel<T>.filterNot(predicate: suspend (T) -> Boolean): ReceiveChannel<T> = filter() { !predicate(it) }
-
-
-/**
- * Returns a channel containing all elements that are not `null`.
- *
- * The operation is _intermediate_ and _stateless_.
- */
-@Suppress("UNCHECKED_CAST")
-public suspend fun <T : Any> ReceiveChannel<T?>.filterNotNull(): ReceiveChannel<T> = filter { it != null } as ReceiveChannel<T>
-
-/**
- * Appends all elements that are not `null` to the given [destination].
- *
- * The operation is _terminal_.
- */
-public suspend fun <C : MutableCollection<in T>, T : Any> ReceiveChannel<T?>.filterNotNullTo(destination: C): C {
-    for (element in this) if (element != null) destination.add(element)
-    return destination
-}
-
-/**
- * Appends all elements not matching the given [predicate] to the given [destination].
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T, C : MutableCollection<in T>> ReceiveChannel<T>.filterNotTo(destination: C, predicate: suspend (T) -> Boolean): C {
-    for (element in this) if (!predicate(element)) destination.add(element)
-    return destination
-}
-
-/**
- * Appends all elements matching the given [predicate] to the given [destination].
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T, C : MutableCollection<in T>> ReceiveChannel<T>.filterTo(destination: C, predicate: suspend (T) -> Boolean): C {
-    for (element in this) if (predicate(element)) destination.add(element)
-    return destination
-}
-
-/**
- * Returns a channel containing first [n] elements.
- *
- * The operation is _intermediate_ and _stateless_.
- */
-public suspend fun <T> ReceiveChannel<T>.take(n: Int): ReceiveChannel<T> = produce(CommonPool) {
-    if (n == 0) return@produce
-    require(n >= 0) { "Requested element count $n is less than zero." }
-
-    var remaining: Int = n
-    for (element in this@take) {
-        send(element)
-        remaining--
-        if (remaining == 0)
-            return@produce
-    }
-}
-
-/**
- * Returns a channel containing first elements satisfying the given [predicate].
- *
- * The operation is _intermediate_ and _stateless_.
- */
-public suspend fun <T> ReceiveChannel<T>.takeWhile(predicate: suspend (T) -> Boolean): ReceiveChannel<T> = produce(Unconfined) {
-    for (element in this@takeWhile) {
-        if (!predicate(element)) return@produce
-        send(element)
-    }
-}
-
-/**
- * Returns a [Map] containing key-value pairs provided by [transform] function
- * applied to elements of the given channel.
- *
- * If any of two pairs would have the same key the last one gets added to the map.
- *
- * The returned map preserves the entry iteration order of the original channel.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T, K, V> ReceiveChannel<T>.associate(transform: suspend (T) -> Pair<K, V>): Map<K, V> {
-    return associateTo(LinkedHashMap<K, V>(), transform)
-}
-
-/**
- * Returns a [Map] containing the elements from the given channel indexed by the key
- * returned from [keySelector] function applied to each element.
- *
- * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
- *
- * The returned map preserves the entry iteration order of the original channel.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T, K> ReceiveChannel<T>.associateBy(keySelector: suspend (T) -> K): Map<K, T> {
-    return associateByTo(LinkedHashMap<K, T>(), keySelector)
-}
-
-/**
- * Returns a [Map] containing the values provided by [valueTransform] and indexed by [keySelector] functions applied to elements of the given channel.
- *
- * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
- *
- * The returned map preserves the entry iteration order of the original channel.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T, K, V> ReceiveChannel<T>.associateBy(keySelector: suspend (T) -> K, valueTransform: suspend (T) -> V): Map<K, V> {
-    return associateByTo(LinkedHashMap<K, V>(), keySelector, valueTransform)
-}
-
-/**
- * Populates and returns the [destination] mutable map with key-value pairs,
- * where key is provided by the [keySelector] function applied to each element of the given channel
- * and value is the element itself.
- *
- * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T, K, M : MutableMap<in K, in T>> ReceiveChannel<T>.associateByTo(destination: M, keySelector: suspend (T) -> K): M {
-    for (element in this) {
-        destination.put(keySelector(element), element)
-    }
-    return destination
-}
-
-/**
- * Populates and returns the [destination] mutable map with key-value pairs,
- * where key is provided by the [keySelector] function and
- * and value is provided by the [valueTransform] function applied to elements of the given channel.
- *
- * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T, K, V, M : MutableMap<in K, in V>> ReceiveChannel<T>.associateByTo(destination: M, keySelector: suspend (T) -> K, valueTransform: suspend (T) -> V): M {
-    for (element in this) {
-        destination.put(keySelector(element), valueTransform(element))
-    }
-    return destination
-}
-
-/**
- * Populates and returns the [destination] mutable map with key-value pairs
- * provided by [transform] function applied to each element of the given channel.
- *
- * If any of two pairs would have the same key the last one gets added to the map.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T, K, V, M : MutableMap<in K, in V>> ReceiveChannel<T>.associateTo(destination: M, transform: suspend (T) -> Pair<K, V>): M {
-    for (element in this) {
-        destination += transform(element)
-    }
-    return destination
-}
-
-/**
- * Appends all elements to the given [destination] collection.
- *
- * The operation is _terminal_.
- */
-public suspend fun <T, C : MutableCollection<in T>> ReceiveChannel<T>.toCollection(destination: C): C {
-    for (item in this) {
-        destination.add(item)
-    }
-    return destination
-}
-
-/**
- * Returns a [List] containing all elements.
- *
- * The operation is _terminal_.
- */
-public suspend fun <T> ReceiveChannel<T>.toList(): List<T> {
-    return this.toMutableList()
-}
-
-/**
- * Returns a [Map] filled with all elements of this channel.
- *
- * The operation is _terminal_.
- */
-public suspend fun <K, V> ReceiveChannel<Pair<K, V>>.toMap(): Map<K, V> {
-    return toMap(LinkedHashMap<K, V>())
-}
-
-/**
- * Returns a [MutableMap] filled with all elements of this channel.
- *
- * The operation is _terminal_.
- */
-public suspend fun <K, V, M : MutableMap<in K, in V>> ReceiveChannel<Pair<K, V>>.toMap(destination: M): M {
-    consumeEach {
-        destination += it
-    }
-    return destination
-}
-
-/**
- * Returns a [MutableList] filled with all elements of this channel.
- *
- * The operation is _terminal_.
- */
-public suspend fun <T> ReceiveChannel<T>.toMutableList(): MutableList<T> {
-    return toCollection(ArrayList<T>())
-}
-
-/**
- * Returns a [Set] of all elements.
- *
- * The returned set preserves the element iteration order of the original channel.
- *
- * The operation is _terminal_.
- */
-public suspend fun <T> ReceiveChannel<T>.toSet(): Set<T> {
-    return toCollection(LinkedHashSet<T>())
-}
-
-/**
- * Returns a single channel of all elements from results of [transform] function being invoked on each element of original channel.
- *
- * The operation is _intermediate_ and _stateless_.
- */
-public suspend fun <T, R> ReceiveChannel<T>.flatMap(transform: suspend (T) -> ReceiveChannel<R>): ReceiveChannel<R> = produce(Unconfined) {
-    for (element in this@flatMap) {
-        for (sub in transform(element)) {
-            send(sub)
-        }
-    }
-}
-
-/**
- * Groups elements of the original channel by the key returned by the given [keySelector] function
- * applied to each element and returns a map where each group key is associated with a list of corresponding elements.
- *
- * The returned map preserves the entry iteration order of the keys produced from the original channel.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T, K> ReceiveChannel<T>.groupBy(keySelector: suspend (T) -> K): Map<K, List<T>> {
-    return groupByTo(LinkedHashMap<K, MutableList<T>>(), keySelector)
-}
-
-/**
- * Groups values returned by the [valueTransform] function applied to each element of the original channel
- * by the key returned by the given [keySelector] function applied to the element
- * and returns a map where each group key is associated with a list of corresponding values.
- *
- * The returned map preserves the entry iteration order of the keys produced from the original channel.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T, K, V> ReceiveChannel<T>.groupBy(keySelector: suspend (T) -> K, valueTransform: suspend (T) -> V): Map<K, List<V>> {
-    return groupByTo(LinkedHashMap<K, MutableList<V>>(), keySelector, valueTransform)
-}
-
-/**
- * Groups elements of the original channel by the key returned by the given [keySelector] function
- * applied to each element and puts to the [destination] map each group key associated with a list of corresponding elements.
- *
- * @return The [destination] map.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T, K, M : MutableMap<in K, MutableList<T>>> ReceiveChannel<T>.groupByTo(destination: M, keySelector: suspend (T) -> K): M {
-    for (element in this) {
-        val key = keySelector(element)
-        val list = destination.getOrPut(key) { ArrayList<T>() }
-        list.add(element)
-    }
-    return destination
-}
-
-/**
- * Groups values returned by the [valueTransform] function applied to each element of the original channel
- * by the key returned by the given [keySelector] function applied to the element
- * and puts to the [destination] map each group key associated with a list of corresponding values.
- *
- * @return The [destination] map.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T, K, V, M : MutableMap<in K, MutableList<V>>> ReceiveChannel<T>.groupByTo(destination: M, keySelector: suspend (T) -> K, valueTransform: suspend (T) -> V): M {
-    for (element in this) {
-        val key = keySelector(element)
-        val list = destination.getOrPut(key) { ArrayList<V>() }
-        list.add(valueTransform(element))
-    }
-    return destination
-}
-
-/**
- * Returns a channel containing the results of applying the given [transform] function
- * to each element in the original channel.
- *
- * The operation is _intermediate_ and _stateless_.
- */
-public suspend fun <T, R> ReceiveChannel<T>.map(transform: suspend (T) -> R): ReceiveChannel<R> = produce(Unconfined) {
-    for (element in this@map) {
-        send(transform(element))
-
-    }
-}
-
-/**
- * Returns a channel containing the results of applying the given [transform] function
- * to each element and its index in the original channel.
- * @param [transform] function that takes the index of an element and the element itself
- * and returns the result of the transform applied to the element.
- *
- * The operation is _intermediate_ and _stateless_.
- */
-public suspend fun <T, R> ReceiveChannel<T>.mapIndexed(transform: suspend (index: Int, T) -> R): ReceiveChannel<R> = produce(Unconfined) {
-    var index = 0
-    for (element in this@mapIndexed) {
-        send(transform(index++, element))
-
-    }
-}
-
-/**
- * Returns a channel containing only the non-null results of applying the given [transform] function
- * to each element and its index in the original channel.
- * @param [transform] function that takes the index of an element and the element itself
- * and returns the result of the transform applied to the element.
- *
- * The operation is _intermediate_ and _stateless_.
- */
-public suspend fun <T, R : Any> ReceiveChannel<T>.mapIndexedNotNull(transform: suspend (index: Int, T) -> R?): ReceiveChannel<R> =
-        mapIndexed(transform).filterNotNull()
-
-/**
- * Applies the given [transform] function to each element and its index in the original channel
- * and appends only the non-null results to the given [destination].
- * @param [transform] function that takes the index of an element and the element itself
- * and returns the result of the transform applied to the element.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T, R : Any, C : SendChannel<in R>> ReceiveChannel<T>.mapIndexedNotNullTo(destination: C, transform: suspend (index: Int, T) -> R?): C {
-    consumeEachIndexed { (index, element) ->
-        transform(index, element)?.let { destination.send(it) }
-    }
-    return destination
-}
-
-/**
- * Applies the given [transform] function to each element and its index in the original channel
- * and appends the results to the given [destination].
- * @param [transform] function that takes the index of an element and the element itself
- * and returns the result of the transform applied to the element.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T, R, C : SendChannel<in R>> ReceiveChannel<T>.mapIndexedTo(destination: C, transform: suspend (index: Int, T) -> R): C {
-    var index = 0
-    for (item in this)
-        destination.send(transform(index++, item))
-    return destination
-}
-
-/**
- * Returns a channel containing only the non-null results of applying the given [transform] function
- * to each element in the original channel.
- *
- * The operation is _intermediate_ and _stateless_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T, R : Any> ReceiveChannel<T>.mapNotNull(transform: suspend (T) -> R?): ReceiveChannel<R> =
-        map(transform).filterNotNull()
-
-/**
- * Applies the given [transform] function to each element in the original channel
- * and appends only the non-null results to the given [destination].
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T, R : Any, C : SendChannel<in R>> ReceiveChannel<T>.mapNotNullTo(destination: C, transform: suspend (T) -> R?): C {
-    consumeEach { element -> transform(element)?.let { destination.send(it) } }
-    return destination
-}
-
-/**
- * Applies the given [transform] function to each element of the original channel
- * and appends the results to the given [destination].
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T, R, C : SendChannel<in R>> ReceiveChannel<T>.mapTo(destination: C, transform: suspend (T) -> R): C {
-    for (item in this)
-        destination.send(transform(item))
-    return destination
-}
-
-/**
- * Returns a channel of [IndexedValue] for each element of the original channel.
- *
- * The operation is _intermediate_ and _stateless_.
- */
-public suspend fun <T> ReceiveChannel<T>.withIndex(): ReceiveChannel<IndexedValue<T>> = produce(CommonPool) {
-    var index = 0
-    for (element in this@withIndex) {
-        send(IndexedValue(index++, element))
-    }
-}
-
-/**
- * Returns a channel containing only distinct elements from the given channel.
- *
- * The elements in the resulting channel are in the same order as they were in the source channel.
- *
- * The operation is _intermediate_ and _stateful_.
- */
-public suspend fun <T> ReceiveChannel<T>.distinct(): ReceiveChannel<T> {
-    return this.distinctBy { it }
-}
-
-/**
- * Returns a channel containing only elements from the given channel
- * having distinct keys returned by the given [selector] function.
- *
- * The elements in the resulting channel are in the same order as they were in the source channel.
- *
- * The operation is _intermediate_ and _stateful_.
- */
-public suspend fun <T, K> ReceiveChannel<T>.distinctBy(selector: suspend (T) -> K): ReceiveChannel<T> = produce(Unconfined) {
-    val keys = HashSet<K>()
-    for (element in this@distinctBy) {
-        val k = selector(element)
-        if (k !in keys) {
-            send(element)
-            keys += k
-        }
-    }
-}
-
-/**
- * Returns a mutable set containing all distinct elements from the given channel.
- *
- * The returned set preserves the element iteration order of the original channel.
- *
- * The operation is _terminal_.
- */
-public suspend fun <T> ReceiveChannel<T>.toMutableSet(): MutableSet<T> {
-    val set = LinkedHashSet<T>()
-    for (item in this) set.add(item)
-    return set
-}
-
-/**
- * Returns `true` if all elements match the given [predicate].
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T> ReceiveChannel<T>.all(predicate: suspend (T) -> Boolean): Boolean {
-    for (element in this) if (!predicate(element)) return false
-    return true
-}
-
-/**
- * Returns `true` if channel has at least one element.
- *
- * The operation is _terminal_.
- */
-public suspend fun <T> ReceiveChannel<T>.any(): Boolean {
-    for (element in this) return true
-    return false
-}
-
-/**
- * Returns `true` if at least one element matches the given [predicate].
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T> ReceiveChannel<T>.any(predicate: suspend (T) -> Boolean): Boolean {
-    for (element in this) if (predicate(element)) return true
-    return false
-}
-
-/**
- * Returns the number of elements in this channel.
- *
- * The operation is _terminal_.
- */
-public suspend fun <T> ReceiveChannel<T>.count(): Int {
-    var count = 0
-    for (element in this) count++
-    return count
-}
-
-/**
- * Returns the number of elements matching the given [predicate].
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T> ReceiveChannel<T>.count(predicate: suspend (T) -> Boolean): Int {
-    var count = 0
-    for (element in this) if (predicate(element)) count++
-    return count
-}
-
-/**
- * Accumulates value starting with [initial] value and applying [operation] from left to right to current accumulator value and each element.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T, R> ReceiveChannel<T>.fold(initial: R, operation: suspend (acc: R, T) -> R): R {
-    var accumulator = initial
-    for (element in this) accumulator = operation(accumulator, element)
-    return accumulator
-}
-
-/**
- * Accumulates value starting with [initial] value and applying [operation] from left to right
- * to current accumulator value and each element with its index in the original channel.
- * @param [operation] function that takes the index of an element, current accumulator value
- * and the element itself, and calculates the next accumulator value.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T, R> ReceiveChannel<T>.foldIndexed(initial: R, operation: suspend (index: Int, acc: R, T) -> R): R {
-    var index = 0
-    var accumulator = initial
-    for (element in this) accumulator = operation(index++, accumulator, element)
-    return accumulator
-}
-
-/**
- * Returns the first element yielding the largest value of the given function or `null` if there are no elements.
- *
- * The operation is _terminal_.
- */
-public suspend fun <T, R : Comparable<R>> ReceiveChannel<T>.maxBy(selector: suspend (T) -> R): T? {
-    val iterator = iterator()
-    if (!iterator.hasNext()) return null
-    var maxElem = iterator.next()
-    var maxValue = selector(maxElem)
-    while (iterator.hasNext()) {
-        val e = iterator.next()
-        val v = selector(e)
-        if (maxValue < v) {
-            maxElem = e
-            maxValue = v
-        }
-    }
-    return maxElem
-}
-
-/**
- * Returns the first element having the largest value according to the provided [comparator] or `null` if there are no elements.
- *
- * The operation is _terminal_.
- */
-public suspend fun <T> ReceiveChannel<T>.maxWith(comparator: Comparator<in T>): T? {
-    val iterator = iterator()
-    if (!iterator.hasNext()) return null
-    var max = iterator.next()
-    while (iterator.hasNext()) {
-        val e = iterator.next()
-        if (comparator.compare(max, e) < 0) max = e
-    }
-    return max
-}
-
-/**
- * Returns the first element yielding the smallest value of the given function or `null` if there are no elements.
- *
- * The operation is _terminal_.
- */
-public suspend fun <T, R : Comparable<R>> ReceiveChannel<T>.minBy(selector: suspend (T) -> R): T? {
-    val iterator = iterator()
-    if (!iterator.hasNext()) return null
-    var minElem = iterator.next()
-    var minValue = selector(minElem)
-    while (iterator.hasNext()) {
-        val e = iterator.next()
-        val v = selector(e)
-        if (minValue > v) {
-            minElem = e
-            minValue = v
-        }
-    }
-    return minElem
-}
-
-/**
- * Returns the first element having the smallest value according to the provided [comparator] or `null` if there are no elements.
- *
- * The operation is _terminal_.
- */
-public suspend fun <T> ReceiveChannel<T>.minWith(comparator: Comparator<in T>): T? {
-    val iterator = iterator()
-    if (!iterator.hasNext()) return null
-    var min = iterator.next()
-    while (iterator.hasNext()) {
-        val e = iterator.next()
-        if (comparator.compare(min, e) > 0) min = e
-    }
-    return min
-}
-
-/**
- * Returns `true` if the channel has no elements.
- *
- * The operation is _terminal_.
- */
-public suspend fun <T> ReceiveChannel<T>.none(): Boolean {
-    for (element in this) return false
-    return true
-}
-
-/**
- * Returns `true` if no elements match the given [predicate].
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T> ReceiveChannel<T>.none(predicate: suspend (T) -> Boolean): Boolean {
-    for (element in this) if (predicate(element)) return false
-    return true
-}
-
-/**
- * Accumulates value starting with the first element and applying [operation] from left to right to current accumulator value and each element.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <S, T : S> ReceiveChannel<T>.reduce(operation: suspend (acc: S, T) -> S): S {
-    val iterator = this.iterator()
-    if (!iterator.hasNext()) throw UnsupportedOperationException("Empty channel can't be reduced.")
-    var accumulator: S = iterator.next()
-    while (iterator.hasNext()) {
-        accumulator = operation(accumulator, iterator.next())
-    }
-    return accumulator
-}
-
-/**
- * Accumulates value starting with the first element and applying [operation] from left to right
- * to current accumulator value and each element with its index in the original channel.
- * @param [operation] function that takes the index of an element, current accumulator value
- * and the element itself and calculates the next accumulator value.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <S, T : S> ReceiveChannel<T>.reduceIndexed(operation: suspend (index: Int, acc: S, T) -> S): S {
-    val iterator = this.iterator()
-    if (!iterator.hasNext()) throw UnsupportedOperationException("Empty channel can't be reduced.")
-    var index = 1
-    var accumulator: S = iterator.next()
-    while (iterator.hasNext()) {
-        accumulator = operation(index++, accumulator, iterator.next())
-    }
-    return accumulator
-}
-
-/**
- * Returns the sum of all values produced by [selector] function applied to each element in the channel.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T> ReceiveChannel<T>.sumBy(selector: suspend (T) -> Int): Int {
-    var sum: Int = 0
-    for (element in this) {
-        sum += selector(element)
-    }
-    return sum
-}
-
-/**
- * Returns the sum of all values produced by [selector] function applied to each element in the channel.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T> ReceiveChannel<T>.sumByDouble(selector: suspend (T) -> Double): Double {
-    var sum: Double = 0.0
-    for (element in this) {
-        sum += selector(element)
-    }
-    return sum
-}
-
-/**
- * Returns an original collection containing all the non-`null` elements, throwing an [IllegalArgumentException] if there are any `null` elements.
- *
- * The operation is _intermediate_ and _stateless_.
- */
-public suspend fun <T : Any> ReceiveChannel<T?>.requireNoNulls(): ReceiveChannel<T> {
-    return map { it ?: throw IllegalArgumentException("null element found in $this.") }
-}
-
-/**
- * Splits the original channel into pair of lists,
- * where *first* list contains elements for which [predicate] yielded `true`,
- * while *second* list contains elements for which [predicate] yielded `false`.
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T> ReceiveChannel<T>.partition(predicate: suspend (T) -> Boolean): Pair<List<T>, List<T>> {
-    val first = ArrayList<T>()
-    val second = ArrayList<T>()
-    for (element in this) {
-        if (predicate(element)) {
-            first.add(element)
-        } else {
-            second.add(element)
-        }
-    }
-    return Pair(first, second)
-}
-
-/**
- * Send each element of the original channel
- * and appends the results to the given [destination].
- *
- * The operation is _terminal_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T, C : SendChannel<in T>> ReceiveChannel<T>.sendTo(destination: C): C {
-    for (item in this)
-        destination.send(item)
-    return destination
-}
-
-/**
- * Returns a channel of pairs built from elements of both channels with same indexes.
- * Resulting channel has length of shortest input channel.
- *
- * The operation is _intermediate_ and _stateless_.
- */
-public infix suspend fun <T, R> ReceiveChannel<T>.zip(other: ReceiveChannel<R>): ReceiveChannel<Pair<T, R>> {
-    return zip(other) { t1, t2 -> t1 to t2 }
-}
-
-/**
- * Returns a channel of values built from elements of both collections with same indexes using provided [transform]. Resulting channel has length of shortest input channels.
- *
- * The operation is _intermediate_ and _stateless_.
- */
-// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
-public suspend fun <T, R, V> ReceiveChannel<T>.zip(other: ReceiveChannel<R>, transform: suspend (a: T, b: R) -> V): ReceiveChannel<V> = produce(Unconfined) {
-    for (element1 in this@zip) {
-        val element2 = other.receiveOrNull() ?: break
-        send(transform(element1, element2))
-    }
-}
+// -------- Operations on SendChannel  --------
 
 /**
  * Adds [element] into to this channel, **blocking** the caller while this channel [Channel.isFull],
@@ -1288,3 +40,1482 @@
         send(element)
     }
 }
+
+// -------- Conversions to ReceiveChannel  --------
+
+/**
+ * Returns a channel to read all element of the [Iterable].
+ */
+public fun <E> Iterable<E>.asReceiveChannel(context: CoroutineContext = Unconfined): ReceiveChannel<E> =
+    produce(context) {
+        for (element in this@asReceiveChannel)
+            send(element)
+    }
+
+/**
+ * Returns a channel to read all element of the [Sequence].
+ */
+public fun <E> Sequence<E>.asReceiveChannel(context: CoroutineContext = Unconfined): ReceiveChannel<E> =
+    produce(context) {
+        for (element in this@asReceiveChannel)
+            send(element)
+    }
+
+// -------- Operations on BroadcastChannel --------
+
+/**
+ * Opens subscription to this [BroadcastChannel] and makes sure that the given [block] consumes all elements
+ * from it by always invoking [cancel][SubscriptionReceiveChannel.cancel] after the execution of the block.
+ */
+public inline fun <E, R> BroadcastChannel<E>.consume(block: SubscriptionReceiveChannel<E>.() -> R): R {
+    val channel = openSubscription()
+    try {
+        return channel.block()
+    } finally {
+        channel.cancel()
+    }
+}
+
+/**
+ * Subscribes to this [BroadcastChannel] and performs the specified action for each received element.
+ */
+public inline suspend fun <E> BroadcastChannel<E>.consumeEach(action: (E) -> Unit) =
+    consume {
+        for (element in this) action(element)
+    }
+
+/**
+ * @suppress: **Deprecated**: binary compatibility with old code
+ */
+@Deprecated("binary compatibility with old code", level = DeprecationLevel.HIDDEN)
+public suspend fun <E> BroadcastChannel<E>.consumeEach(action: suspend (E) -> Unit) =
+    consumeEach { action(it) }
+
+// -------- Operations on ReceiveChannel --------
+
+/**
+ * Makes sure that the given [block] consumes all elements from the given channel
+ * by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
+ */
+public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R =
+    try {
+        block()
+    } finally {
+        cancel()
+    }
+
+/**
+ * Performs the given [action] for each received element.
+ *
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit) =
+    consume {
+        for (element in this) action(element)
+    }
+
+/**
+ * @suppress: **Deprecated**: binary compatibility with old code
+ */
+@Deprecated("binary compatibility with old code", level = DeprecationLevel.HIDDEN)
+public suspend fun <E> ReceiveChannel<E>.consumeEach(action: suspend (E) -> Unit) =
+    consumeEach { action(it) }
+
+/**
+ * Performs the given [action] for each received element.
+ *
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E> ReceiveChannel<E>.consumeEachIndexed(action: (IndexedValue<E>) -> Unit) {
+    var index = 0
+    consumeEach {
+        action(IndexedValue(index++, it))
+    }
+}
+
+/**
+ * Returns an element at the given [index] or throws an [IndexOutOfBoundsException] if the [index] is out of bounds of this channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.elementAt(index: Int): E =
+    elementAtOrElse(index) { throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.") }
+
+/**
+ * Returns an element at the given [index] or the result of calling the [defaultValue] function if the [index] is out of bounds of this channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E> ReceiveChannel<E>.elementAtOrElse(index: Int, defaultValue: (Int) -> E): E =
+    consume {
+        if (index < 0)
+            return defaultValue(index)
+        var count = 0
+        for (element in this) {
+            if (index == count++)
+                return element
+        }
+        return defaultValue(index)
+    }
+
+/**
+ * Returns an element at the given [index] or `null` if the [index] is out of bounds of this channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.elementAtOrNull(index: Int): E? =
+    consume {
+        if (index < 0)
+            return null
+        var count = 0
+        for (element in this) {
+            if (index == count++)
+                return element
+        }
+        return null
+    }
+
+/**
+ * Returns the first element matching the given [predicate], or `null` if no such element was found.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E> ReceiveChannel<E>.find(predicate: (E) -> Boolean): E? =
+    firstOrNull(predicate)
+
+/**
+ * Returns the last element matching the given [predicate], or `null` if no such element was found.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E> ReceiveChannel<E>.findLast(predicate: (E) -> Boolean): E? =
+    lastOrNull(predicate)
+
+/**
+ * Returns first element.
+ * @throws [NoSuchElementException] if the channel is empty.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.first(): E =
+    consume {
+        val iterator = iterator()
+        if (!iterator.hasNext())
+            throw NoSuchElementException("ReceiveChannel is empty.")
+        return iterator.next()
+    }
+
+/**
+ * Returns the first element matching the given [predicate].
+ * @throws [NoSuchElementException] if no such element is found.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E> ReceiveChannel<E>.first(predicate: (E) -> Boolean): E {
+    consumeEach {
+        if (predicate(it)) return it
+    }
+    throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.")
+}
+
+/**
+ * Returns the first element, or `null` if the channel is empty.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.firstOrNull(): E? =
+    consume {
+        val iterator = iterator()
+        if (!iterator.hasNext())
+            return null
+        return iterator.next()
+    }
+
+/**
+ * Returns the first element matching the given [predicate], or `null` if element was not found.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E> ReceiveChannel<E>.firstOrNull(predicate: (E) -> Boolean): E? {
+    consumeEach {
+        if (predicate(it)) return it
+    }
+    return null
+}
+
+/**
+ * Returns first index of [element], or -1 if the channel does not contain element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.indexOf(element: E): Int {
+    var index = 0
+    consumeEach {
+        if (element == it)
+            return index
+        index++
+    }
+    return -1
+}
+
+/**
+ * Returns index of the first element matching the given [predicate], or -1 if the channel does not contain such element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E> ReceiveChannel<E>.indexOfFirst(predicate: (E) -> Boolean): Int {
+    var index = 0
+    consumeEach {
+        if (predicate(it))
+            return index
+        index++
+    }
+    return -1
+}
+
+/**
+ * Returns index of the last element matching the given [predicate], or -1 if the channel does not contain such element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E> ReceiveChannel<E>.indexOfLast(predicate: (E) -> Boolean): Int {
+    var lastIndex = -1
+    var index = 0
+    consumeEach {
+        if (predicate(it))
+            lastIndex = index
+        index++
+    }
+    return lastIndex
+}
+
+/**
+ * Returns the last element.
+ * @throws [NoSuchElementException] if the channel is empty.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.last(): E =
+    consume {
+        val iterator = iterator()
+        if (!iterator.hasNext())
+            throw NoSuchElementException("ReceiveChannel is empty.")
+        var last = iterator.next()
+        while (iterator.hasNext())
+            last = iterator.next()
+        return last
+    }
+
+/**
+ * Returns the last element matching the given [predicate].
+ * @throws [NoSuchElementException] if no such element is found.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E> ReceiveChannel<E>.last(predicate: (E) -> Boolean): E {
+    var last: E? = null
+    var found = false
+    consumeEach {
+        if (predicate(it)) {
+            last = it
+            found = true
+        }
+    }
+    if (!found) throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.")
+    @Suppress("UNCHECKED_CAST")
+    return last as E
+}
+
+/**
+ * Returns last index of [element], or -1 if the channel does not contain element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.lastIndexOf(element: E): Int {
+    var lastIndex = -1
+    var index = 0
+    consumeEach {
+        if (element == it)
+            lastIndex = index
+        index++
+    }
+    return lastIndex
+}
+
+/**
+ * Returns the last element, or `null` if the channel is empty.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.lastOrNull(): E? =
+    consume {
+        val iterator = iterator()
+        if (!iterator.hasNext())
+            return null
+        var last = iterator.next()
+        while (iterator.hasNext())
+            last = iterator.next()
+        return last
+    }
+
+/**
+ * Returns the last element matching the given [predicate], or `null` if no such element was found.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E> ReceiveChannel<E>.lastOrNull(predicate: (E) -> Boolean): E? {
+    var last: E? = null
+    consumeEach {
+        if (predicate(it)) {
+            last = it
+        }
+    }
+    return last
+}
+
+/**
+ * Returns the single element, or throws an exception if the channel is empty or has more than one element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.single(): E =
+    consume {
+        val iterator = iterator()
+        if (!iterator.hasNext())
+            throw NoSuchElementException("ReceiveChannel is empty.")
+        val single = iterator.next()
+        if (iterator.hasNext())
+            throw IllegalArgumentException("ReceiveChannel has more than one element.")
+        return single
+    }
+
+/**
+ * Returns the single element matching the given [predicate], or throws exception if there is no or more than one matching element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E> ReceiveChannel<E>.single(predicate: (E) -> Boolean): E {
+    var single: E? = null
+    var found = false
+    consumeEach {
+        if (predicate(it)) {
+            if (found) throw IllegalArgumentException("ReceiveChannel contains more than one matching element.")
+            single = it
+            found = true
+        }
+    }
+    if (!found) throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.")
+    @Suppress("UNCHECKED_CAST")
+    return single as E
+}
+
+/**
+ * Returns single element, or `null` if the channel is empty or has more than one element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.singleOrNull(): E? =
+    consume {
+        val iterator = iterator()
+        if (!iterator.hasNext())
+            return null
+        val single = iterator.next()
+        if (iterator.hasNext())
+            return null
+        return single
+    }
+
+/**
+ * Returns the single element matching the given [predicate], or `null` if element was not found or more than one element was found.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E> ReceiveChannel<E>.singleOrNull(predicate: (E) -> Boolean): E? {
+    var single: E? = null
+    var found = false
+    consumeEach {
+        if (predicate(it)) {
+            if (found) return null
+            single = it
+            found = true
+        }
+    }
+    if (!found) return null
+    return single
+}
+
+/**
+ * Returns a channel containing all elements except first [n] elements.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public fun <E> ReceiveChannel<E>.drop(n: Int, context: CoroutineContext = Unconfined): ReceiveChannel<E> =
+    produce(context) {
+        consume {
+            require(n >= 0) { "Requested element count $n is less than zero." }
+            var remaining: Int = n
+            if (remaining > 0)
+                for (element in this@drop) {
+                    remaining--
+                    if (remaining == 0)
+                        break
+                }
+            for (element in this@drop) {
+                send(element)
+            }
+        }
+    }
+
+/**
+ * Returns a channel containing all elements except first elements that satisfy the given [predicate].
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E> ReceiveChannel<E>.dropWhile(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
+    produce(context) {
+        consume {
+            for (element in this@dropWhile) {
+                if (!predicate(element)) {
+                    send(element)
+                    break
+                }
+            }
+            for (element in this@dropWhile) {
+                send(element)
+            }
+        }
+    }
+
+/**
+ * Returns a channel containing only elements matching the given [predicate].
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E> ReceiveChannel<E>.filter(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
+    produce(context) {
+        consumeEach {
+            if (predicate(it)) send(it)
+        }
+    }
+
+/**
+ * Returns a channel containing only elements matching the given [predicate].
+ * @param [predicate] function that takes the index of an element and the element itself
+ * and returns the result of predicate evaluation on the element.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E> ReceiveChannel<E>.filterIndexed(context: CoroutineContext = Unconfined, predicate: suspend (index: Int, E) -> Boolean): ReceiveChannel<E> =
+    produce(context) {
+        var index = 0
+        consumeEach {
+            if (predicate(index++, it)) send(it)
+        }
+    }
+
+/**
+ * Appends all elements matching the given [predicate] to the given [destination].
+ * @param [predicate] function that takes the index of an element and the element itself
+ * and returns the result of predicate evaluation on the element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterIndexedTo(destination: C, predicate: (index: Int, E) -> Boolean): C {
+    consumeEachIndexed { (index, element) ->
+        if (predicate(index, element)) destination.add(element)
+    }
+    return destination
+}
+
+/**
+ * Appends all elements matching the given [predicate] to the given [destination].
+ * @param [predicate] function that takes the index of an element and the element itself
+ * and returns the result of predicate evaluation on the element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterIndexedTo(destination: C, predicate: (index: Int, E) -> Boolean): C {
+    consumeEachIndexed { (index, element) ->
+        if (predicate(index, element)) destination.send(element)
+    }
+    return destination
+}
+
+/**
+ * Returns a channel containing all elements not matching the given [predicate].
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E> ReceiveChannel<E>.filterNot(predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
+    filter { !predicate(it) }
+
+/**
+ * Returns a channel containing all elements that are not `null`.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+@Suppress("UNCHECKED_CAST")
+public fun <E : Any> ReceiveChannel<E?>.filterNotNull(): ReceiveChannel<E> =
+    filter { it != null } as ReceiveChannel<E>
+
+/**
+ * Appends all elements that are not `null` to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E : Any, C : MutableCollection<in E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
+    consumeEach {
+        if (it != null) destination.add(it)
+    }
+    return destination
+}
+
+/**
+ * Appends all elements that are not `null` to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E : Any, C : SendChannel<E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
+    consumeEach {
+        if (it != null) destination.send(it)
+    }
+    return destination
+}
+
+/**
+ * Appends all elements not matching the given [predicate] to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterNotTo(destination: C, predicate: (E) -> Boolean): C {
+    consumeEach {
+        if (!predicate(it)) destination.add(it)
+    }
+    return destination
+}
+
+/**
+ * Appends all elements not matching the given [predicate] to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterNotTo(destination: C, predicate: (E) -> Boolean): C {
+    consumeEach {
+        if (!predicate(it)) destination.send(it)
+    }
+    return destination
+}
+
+/**
+ * Appends all elements matching the given [predicate] to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterTo(destination: C, predicate: (E) -> Boolean): C {
+    consumeEach {
+        if (predicate(it)) destination.add(it)
+    }
+    return destination
+}
+
+/**
+ * Appends all elements matching the given [predicate] to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterTo(destination: C, predicate: (E) -> Boolean): C {
+    consumeEach {
+        if (predicate(it)) destination.send(it)
+    }
+    return destination
+}
+
+/**
+ * Returns a channel containing first [n] elements.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public fun <E> ReceiveChannel<E>.take(n: Int, context: CoroutineContext = Unconfined): ReceiveChannel<E> =
+    produce(context) {
+        consume {
+            if (n == 0) return@produce
+            require(n >= 0) { "Requested element count $n is less than zero." }
+            var remaining: Int = n
+            for (element in this@take) {
+                send(element)
+                remaining--
+                if (remaining == 0)
+                    return@produce
+            }
+        }
+    }
+
+/**
+ * Returns a channel containing first elements satisfying the given [predicate].
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E> ReceiveChannel<E>.takeWhile(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
+    produce(context) {
+        consumeEach {
+            if (!predicate(it)) return@produce
+            send(it)
+        }
+    }
+
+/**
+ * Returns a [Map] containing key-value pairs provided by [transform] function
+ * applied to elements of the given channel.
+ *
+ * If any of two pairs would have the same key the last one gets added to the map.
+ *
+ * The returned map preserves the entry iteration order of the original channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E, K, V> ReceiveChannel<E>.associate(transform: (E) -> Pair<K, V>): Map<K, V> =
+    associateTo(LinkedHashMap(), transform)
+
+/**
+ * Returns a [Map] containing the elements from the given channel indexed by the key
+ * returned from [keySelector] function applied to each element.
+ *
+ * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
+ *
+ * The returned map preserves the entry iteration order of the original channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E, K> ReceiveChannel<E>.associateBy(keySelector: (E) -> K): Map<K, E> =
+    associateByTo(LinkedHashMap(), keySelector)
+
+/**
+ * Returns a [Map] containing the values provided by [valueTransform] and indexed by [keySelector] functions applied to elements of the given channel.
+ *
+ * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
+ *
+ * The returned map preserves the entry iteration order of the original channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E, K, V> ReceiveChannel<E>.associateBy(keySelector: (E) -> K, valueTransform: (E) -> V): Map<K, V> =
+    associateByTo(LinkedHashMap(), keySelector, valueTransform)
+
+/**
+ * Populates and returns the [destination] mutable map with key-value pairs,
+ * where key is provided by the [keySelector] function applied to each element of the given channel
+ * and value is the element itself.
+ *
+ * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E, K, M : MutableMap<in K, in E>> ReceiveChannel<E>.associateByTo(destination: M, keySelector: (E) -> K): M {
+    consumeEach {
+        destination.put(keySelector(it), it)
+    }
+    return destination
+}
+
+/**
+ * Populates and returns the [destination] mutable map with key-value pairs,
+ * where key is provided by the [keySelector] function and
+ * and value is provided by the [valueTransform] function applied to elements of the given channel.
+ *
+ * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateByTo(destination: M, keySelector: (E) -> K, valueTransform: (E) -> V): M {
+    consumeEach {
+        destination.put(keySelector(it), valueTransform(it))
+    }
+    return destination
+}
+
+/**
+ * Populates and returns the [destination] mutable map with key-value pairs
+ * provided by [transform] function applied to each element of the given channel.
+ *
+ * If any of two pairs would have the same key the last one gets added to the map.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateTo(destination: M, transform: (E) -> Pair<K, V>): M {
+    consumeEach {
+        destination += transform(it)
+    }
+    return destination
+}
+
+/**
+ * Send each element of the original channel
+ * and appends the results to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.toChannel(destination: C): C {
+    consumeEach {
+        destination.send(it)
+    }
+    return destination
+}
+
+/**
+ * Appends all elements to the given [destination] collection.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.toCollection(destination: C): C {
+    consumeEach {
+        destination.add(it)
+    }
+    return destination
+}
+
+/**
+ * Returns a [List] containing all elements.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.toList(): List<E> =
+    this.toMutableList()
+
+/**
+ * Returns a [Map] filled with all elements of this channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <K, V> ReceiveChannel<Pair<K, V>>.toMap(): Map<K, V> =
+    toMap(LinkedHashMap())
+
+/**
+ * Returns a [MutableMap] filled with all elements of this channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <K, V, M : MutableMap<in K, in V>> ReceiveChannel<Pair<K, V>>.toMap(destination: M): M {
+    consumeEach {
+        destination += it
+    }
+    return destination
+}
+
+/**
+ * Returns a [MutableList] filled with all elements of this channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.toMutableList(): MutableList<E> =
+    toCollection(ArrayList())
+
+/**
+ * Returns a [Set] of all elements.
+ *
+ * The returned set preserves the element iteration order of the original channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.toSet(): Set<E> =
+    toCollection(LinkedHashSet())
+
+/**
+ * Returns a single channel of all elements from results of [transform] function being invoked on each element of original channel.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E, R> ReceiveChannel<E>.flatMap(context: CoroutineContext = Unconfined, transform: suspend (E) -> ReceiveChannel<R>): ReceiveChannel<R> =
+    produce(context) {
+        consumeEach {
+            transform(it).consumeEach {
+                send(it)
+            }
+        }
+    }
+
+/**
+ * Groups elements of the original channel by the key returned by the given [keySelector] function
+ * applied to each element and returns a map where each group key is associated with a list of corresponding elements.
+ *
+ * The returned map preserves the entry iteration order of the keys produced from the original channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E, K> ReceiveChannel<E>.groupBy(keySelector: (E) -> K): Map<K, List<E>> =
+    groupByTo(LinkedHashMap(), keySelector)
+
+/**
+ * Groups values returned by the [valueTransform] function applied to each element of the original channel
+ * by the key returned by the given [keySelector] function applied to the element
+ * and returns a map where each group key is associated with a list of corresponding values.
+ *
+ * The returned map preserves the entry iteration order of the keys produced from the original channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E, K, V> ReceiveChannel<E>.groupBy(keySelector: (E) -> K, valueTransform: (E) -> V): Map<K, List<V>> =
+    groupByTo(LinkedHashMap(), keySelector, valueTransform)
+
+/**
+ * Groups elements of the original channel by the key returned by the given [keySelector] function
+ * applied to each element and puts to the [destination] map each group key associated with a list of corresponding elements.
+ *
+ * @return The [destination] map.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E, K, M : MutableMap<in K, MutableList<E>>> ReceiveChannel<E>.groupByTo(destination: M, keySelector: (E) -> K): M {
+    consumeEach {
+        val key = keySelector(it)
+        val list = destination.getOrPut(key) { ArrayList() }
+        list.add(it)
+    }
+    return destination
+}
+
+/**
+ * Groups values returned by the [valueTransform] function applied to each element of the original channel
+ * by the key returned by the given [keySelector] function applied to the element
+ * and puts to the [destination] map each group key associated with a list of corresponding values.
+ *
+ * @return The [destination] map.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E, K, V, M : MutableMap<in K, MutableList<V>>> ReceiveChannel<E>.groupByTo(destination: M, keySelector: (E) -> K, valueTransform: (E) -> V): M {
+    consumeEach {
+        val key = keySelector(it)
+        val list = destination.getOrPut(key) { ArrayList() }
+        list.add(valueTransform(it))
+    }
+    return destination
+}
+
+/**
+ * Returns a channel containing the results of applying the given [transform] function
+ * to each element in the original channel.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, transform: suspend (E) -> R): ReceiveChannel<R> =
+    produce(context) {
+        consumeEach {
+            send(transform(it))
+        }
+    }
+
+/**
+ * Returns a channel containing the results of applying the given [transform] function
+ * to each element and its index in the original channel.
+ * @param [transform] function that takes the index of an element and the element itself
+ * and returns the result of the transform applied to the element.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E, R> ReceiveChannel<E>.mapIndexed(context: CoroutineContext = Unconfined, transform: suspend (index: Int, E) -> R): ReceiveChannel<R> =
+    produce(context) {
+        var index = 0
+        consumeEach {
+            send(transform(index++, it))
+        }
+    }
+
+/**
+ * Returns a channel containing only the non-null results of applying the given [transform] function
+ * to each element and its index in the original channel.
+ * @param [transform] function that takes the index of an element and the element itself
+ * and returns the result of the transform applied to the element.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E, R : Any> ReceiveChannel<E>.mapIndexedNotNull(context: CoroutineContext = Unconfined, transform: suspend (index: Int, E) -> R?): ReceiveChannel<R> =
+    mapIndexed(context, transform).filterNotNull()
+
+/**
+ * Applies the given [transform] function to each element and its index in the original channel
+ * and appends only the non-null results to the given [destination].
+ * @param [transform] function that takes the index of an element and the element itself
+ * and returns the result of the transform applied to the element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedNotNullTo(destination: C, transform: (index: Int, E) -> R?): C {
+    consumeEachIndexed { (index, element) ->
+        transform(index, element)?.let { destination.add(it) }
+    }
+    return destination
+}
+
+/**
+ * Applies the given [transform] function to each element and its index in the original channel
+ * and appends only the non-null results to the given [destination].
+ * @param [transform] function that takes the index of an element and the element itself
+ * and returns the result of the transform applied to the element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E, R : Any, C : SendChannel<R>> ReceiveChannel<E>.mapIndexedNotNullTo(destination: C, transform: (index: Int, E) -> R?): C {
+    consumeEachIndexed { (index, element) ->
+        transform(index, element)?.let { destination.send(it) }
+    }
+    return destination
+}
+
+/**
+ * Applies the given [transform] function to each element and its index in the original channel
+ * and appends the results to the given [destination].
+ * @param [transform] function that takes the index of an element and the element itself
+ * and returns the result of the transform applied to the element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedTo(destination: C, transform: (index: Int, E) -> R): C {
+    var index = 0
+    consumeEach {
+        destination.add(transform(index++, it))
+    }
+    return destination
+}
+
+/**
+ * Applies the given [transform] function to each element and its index in the original channel
+ * and appends the results to the given [destination].
+ * @param [transform] function that takes the index of an element and the element itself
+ * and returns the result of the transform applied to the element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E, R, C : SendChannel<R>> ReceiveChannel<E>.mapIndexedTo(destination: C, transform: (index: Int, E) -> R): C {
+    var index = 0
+    consumeEach {
+        destination.send(transform(index++, it))
+    }
+    return destination
+}
+
+/**
+ * Returns a channel containing only the non-null results of applying the given [transform] function
+ * to each element in the original channel.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E, R : Any> ReceiveChannel<E>.mapNotNull(context: CoroutineContext = Unconfined, transform: suspend (E) -> R?): ReceiveChannel<R> =
+    map(context, transform).filterNotNull()
+
+/**
+ * Applies the given [transform] function to each element in the original channel
+ * and appends only the non-null results to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapNotNullTo(destination: C, transform: (E) -> R?): C {
+    consumeEach {
+        transform(it)?.let { destination.add(it) }
+    }
+    return destination
+}
+
+/**
+ * Applies the given [transform] function to each element in the original channel
+ * and appends only the non-null results to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E, R : Any, C : SendChannel<R>> ReceiveChannel<E>.mapNotNullTo(destination: C, transform: (E) -> R?): C {
+    consumeEach {
+        transform(it)?.let { destination.send(it) }
+    }
+    return destination
+}
+
+/**
+ * Applies the given [transform] function to each element of the original channel
+ * and appends the results to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapTo(destination: C, transform: (E) -> R): C {
+    consumeEach {
+        destination.add(transform(it))
+    }
+    return destination
+}
+
+/**
+ * Applies the given [transform] function to each element of the original channel
+ * and appends the results to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E, R, C : SendChannel<R>> ReceiveChannel<E>.mapTo(destination: C, transform: (E) -> R): C {
+    consumeEach {
+        destination.send(transform(it))
+    }
+    return destination
+}
+
+/**
+ * Returns a channel of [IndexedValue] for each element of the original channel.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public fun <E> ReceiveChannel<E>.withIndex(context: CoroutineContext = Unconfined): ReceiveChannel<IndexedValue<E>> =
+    produce(context) {
+        var index = 0
+        consumeEach {
+            send(IndexedValue(index++, it))
+        }
+    }
+
+/**
+ * Returns a channel containing only distinct elements from the given channel.
+ *
+ * The elements in the resulting channel are in the same order as they were in the source channel.
+ *
+ * The operation is _intermediate_ and _stateful_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public fun <E> ReceiveChannel<E>.distinct(): ReceiveChannel<E> =
+    this.distinctBy { it }
+
+/**
+ * Returns a channel containing only elements from the given channel
+ * having distinct keys returned by the given [selector] function.
+ *
+ * The elements in the resulting channel are in the same order as they were in the source channel.
+ *
+ * The operation is _intermediate_ and _stateful_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public fun <E, K> ReceiveChannel<E>.distinctBy(context: CoroutineContext = Unconfined, selector: suspend (E) -> K): ReceiveChannel<E> =
+    produce(context) {
+        val keys = HashSet<K>()
+        consumeEach {
+            val k = selector(it)
+            if (k !in keys) {
+                send(it)
+                keys += k
+            }
+        }
+    }
+
+/**
+ * Returns a mutable set containing all distinct elements from the given channel.
+ *
+ * The returned set preserves the element iteration order of the original channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.toMutableSet(): MutableSet<E> {
+    val set = LinkedHashSet<E>()
+    consumeEach {
+        set.add(it)
+    }
+    return set
+}
+
+/**
+ * Returns `true` if all elements match the given [predicate].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E> ReceiveChannel<E>.all(predicate: (E) -> Boolean): Boolean {
+    consumeEach {
+        if (!predicate(it)) return false
+    }
+    return true
+}
+
+/**
+ * Returns `true` if channel has at least one element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.any(): Boolean =
+    consume {
+        return iterator().hasNext()
+    }
+
+/**
+ * Returns `true` if at least one element matches the given [predicate].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E> ReceiveChannel<E>.any(predicate: (E) -> Boolean): Boolean {
+    consumeEach {
+        if (predicate(it)) return true
+    }
+    return false
+}
+
+/**
+ * Returns the number of elements in this channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.count(): Int {
+    var count = 0
+    consumeEach { count++ }
+    return count
+}
+
+/**
+ * Returns the number of elements matching the given [predicate].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E> ReceiveChannel<E>.count(predicate: (E) -> Boolean): Int {
+    var count = 0
+    consumeEach {
+        if (predicate(it)) count++
+    }
+    return count
+}
+
+/**
+ * Accumulates value starting with [initial] value and applying [operation] from left to right to current accumulator value and each element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E, R> ReceiveChannel<E>.fold(initial: R, operation: (acc: R, E) -> R): R {
+    var accumulator = initial
+    consumeEach {
+        accumulator = operation(accumulator, it)
+    }
+    return accumulator
+}
+
+/**
+ * Accumulates value starting with [initial] value and applying [operation] from left to right
+ * to current accumulator value and each element with its index in the original channel.
+ * @param [operation] function that takes the index of an element, current accumulator value
+ * and the element itself, and calculates the next accumulator value.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E, R> ReceiveChannel<E>.foldIndexed(initial: R, operation: (index: Int, acc: R, E) -> R): R {
+    var index = 0
+    var accumulator = initial
+    consumeEach {
+        accumulator = operation(index++, accumulator, it)
+    }
+    return accumulator
+}
+
+/**
+ * Returns the first element yielding the largest value of the given function or `null` if there are no elements.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E, R : Comparable<R>> ReceiveChannel<E>.maxBy(selector: (E) -> R): E? =
+    consume {
+        val iterator = iterator()
+        if (!iterator.hasNext()) return null
+        var maxElem = iterator.next()
+        var maxValue = selector(maxElem)
+        while (iterator.hasNext()) {
+            val e = iterator.next()
+            val v = selector(e)
+            if (maxValue < v) {
+                maxElem = e
+                maxValue = v
+            }
+        }
+        return maxElem
+    }
+
+/**
+ * Returns the first element having the largest value according to the provided [comparator] or `null` if there are no elements.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.maxWith(comparator: Comparator<in E>): E? =
+    consume {
+        val iterator = iterator()
+        if (!iterator.hasNext()) return null
+        var max = iterator.next()
+        while (iterator.hasNext()) {
+            val e = iterator.next()
+            if (comparator.compare(max, e) < 0) max = e
+        }
+        return max
+    }
+
+/**
+ * Returns the first element yielding the smallest value of the given function or `null` if there are no elements.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E, R : Comparable<R>> ReceiveChannel<E>.minBy(selector: (E) -> R): E? =
+    consume {
+        val iterator = iterator()
+        if (!iterator.hasNext()) return null
+        var minElem = iterator.next()
+        var minValue = selector(minElem)
+        while (iterator.hasNext()) {
+            val e = iterator.next()
+            val v = selector(e)
+            if (minValue > v) {
+                minElem = e
+                minValue = v
+            }
+        }
+        return minElem
+    }
+
+/**
+ * Returns the first element having the smallest value according to the provided [comparator] or `null` if there are no elements.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.minWith(comparator: Comparator<in E>): E? =
+    consume {
+        val iterator = iterator()
+        if (!iterator.hasNext()) return null
+        var min = iterator.next()
+        while (iterator.hasNext()) {
+            val e = iterator.next()
+            if (comparator.compare(min, e) > 0) min = e
+        }
+        return min
+    }
+
+/**
+ * Returns `true` if the channel has no elements.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.none(): Boolean =
+    consume {
+        return !iterator().hasNext()
+    }
+
+/**
+ * Returns `true` if no elements match the given [predicate].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E> ReceiveChannel<E>.none(predicate: (E) -> Boolean): Boolean {
+    consumeEach {
+        if (predicate(it)) return false
+    }
+    return true
+}
+
+/**
+ * Accumulates value starting with the first element and applying [operation] from left to right to current accumulator value and each element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <S, E : S> ReceiveChannel<E>.reduce(operation: (acc: S, E) -> S): S =
+    consume {
+        val iterator = this.iterator()
+        if (!iterator.hasNext()) throw UnsupportedOperationException("Empty channel can't be reduced.")
+        var accumulator: S = iterator.next()
+        while (iterator.hasNext()) {
+            accumulator = operation(accumulator, iterator.next())
+        }
+        return accumulator
+    }
+
+/**
+ * Accumulates value starting with the first element and applying [operation] from left to right
+ * to current accumulator value and each element with its index in the original channel.
+ * @param [operation] function that takes the index of an element, current accumulator value
+ * and the element itself and calculates the next accumulator value.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <S, E : S> ReceiveChannel<E>.reduceIndexed(operation: (index: Int, acc: S, E) -> S): S =
+    consume {
+        val iterator = this.iterator()
+        if (!iterator.hasNext()) throw UnsupportedOperationException("Empty channel can't be reduced.")
+        var index = 1
+        var accumulator: S = iterator.next()
+        while (iterator.hasNext()) {
+            accumulator = operation(index++, accumulator, iterator.next())
+        }
+        return accumulator
+    }
+
+/**
+ * Returns the sum of all values produced by [selector] function applied to each element in the channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E> ReceiveChannel<E>.sumBy(selector: (E) -> Int): Int {
+    var sum = 0
+    consumeEach {
+        sum += selector(it)
+    }
+    return sum
+}
+
+/**
+ * Returns the sum of all values produced by [selector] function applied to each element in the channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E> ReceiveChannel<E>.sumByDouble(selector: (E) -> Double): Double {
+    var sum = 0.0
+    consumeEach {
+        sum += selector(it)
+    }
+    return sum
+}
+
+/**
+ * Returns an original collection containing all the non-`null` elements, throwing an [IllegalArgumentException] if there are any `null` elements.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public fun <E : Any> ReceiveChannel<E?>.requireNoNulls(): ReceiveChannel<E> =
+    map { it ?: throw IllegalArgumentException("null element found in $this.") }
+
+/**
+ * Splits the original channel into pair of lists,
+ * where *first* list contains elements for which [predicate] yielded `true`,
+ * while *second* list contains elements for which [predicate] yielded `false`.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ */
+public inline suspend fun <E> ReceiveChannel<E>.partition(predicate: (E) -> Boolean): Pair<List<E>, List<E>> {
+    val first = ArrayList<E>()
+    val second = ArrayList<E>()
+    consumeEach {
+        if (predicate(it)) {
+            first.add(it)
+        } else {
+            second.add(it)
+        }
+    }
+    return Pair(first, second)
+}
+
+/**
+ * Returns a channel of pairs built from elements of both channels with same indexes.
+ * Resulting channel has length of shortest input channel.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][consume] all elements of both the original [ReceiveChannel] and the `other` one.
+ */
+public infix fun <E, R> ReceiveChannel<E>.zip(other: ReceiveChannel<R>): ReceiveChannel<Pair<E, R>> =
+    zip(other) { t1, t2 -> t1 to t2 }
+
+/**
+ * Returns a channel of values built from elements of both collections with same indexes using provided [transform]. Resulting channel has length of shortest input channels.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][consume] all elements of both the original [ReceiveChannel] and the `other` one.
+ */
+public fun <E, R, V> ReceiveChannel<E>.zip(other: ReceiveChannel<R>, context: CoroutineContext = Unconfined, transform: (a: E, b: R) -> V): ReceiveChannel<V> =
+    produce(context) {
+        other.consume {
+            val otherIterator = other.iterator()
+            this@zip.consumeEach { element1 ->
+                if (!otherIterator.hasNext()) return@consumeEach
+                val element2 = otherIterator.next()
+                send(transform(element1, element2))
+            }
+        }
+    }
+
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt
index 9d092c8..95ae090 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt
@@ -249,10 +249,10 @@
     private class Subscriber<E>(
         private val broadcastChannel: ConflatedBroadcastChannel<E>
     ) : ConflatedChannel<E>(), SubscriptionReceiveChannel<E> {
-        override fun close() {
-            if (close(cause = null))
-                broadcastChannel.closeSubscriber(this)
-        }
+        override fun cancel(cause: Throwable?): Boolean =
+            close(cause).also { closed ->
+                if (closed) broadcastChannel.closeSubscriber(this)
+            }
 
         public override fun offerInternal(element: E): Any = super.offerInternal(element)
     }
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt
index 0ed752d..5152e8a 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt
@@ -36,39 +36,24 @@
 }
 
 /**
- * @suppress **Deprecated**: Renamed to `ProducerScope`.
+ * @suppress **Deprecated**: Use `ReceiveChannel`.
  */
-@Deprecated(message = "Renamed to `ProducerScope`", replaceWith = ReplaceWith("ProducerScope"))
-typealias ChannelBuilder<E> = ProducerScope<E>
-
-/**
- * Return type for [produce] coroutine builder.
- */
-public interface ProducerJob<out E> : Job, ReceiveChannel<E> {
-    /**
-     * A reference to the channel that this coroutine is producing.
-     * All the [ReceiveChannel] functions on this interface delegate to
-     * the channel instance returned by this function.
-     */
+@Deprecated(message = "Use `ReceiveChannel`", replaceWith = ReplaceWith("ReceiveChannel"))
+interface ProducerJob<out E> : ReceiveChannel<E> {
+    @Deprecated(message = "Use ReceiveChannel itself")
     val channel: ReceiveChannel<E>
 }
 
 /**
- * @suppress **Deprecated**: Renamed to `ProducerJob`.
- */
-@Deprecated(message = "Renamed to `ProducerJob`", replaceWith = ReplaceWith("ProducerJob"))
-typealias ChannelJob<E> = ProducerJob<E>
-
-/**
  * Launches new coroutine to produce a stream of values by sending them to a channel
- * and returns a reference to the coroutine as a [ProducerJob]. This resulting
+ * and returns a reference to the coroutine as a [ReceiveChannel]. This resulting
  * object can be used to [receive][ReceiveChannel.receive] elements produced by this coroutine.
  *
  * The scope of the coroutine contains [ProducerScope] interface, which implements
  * both [CoroutineScope] and [SendChannel], so that coroutine can invoke
  * [send][SendChannel.send] directly. The channel is [closed][SendChannel.close]
  * when the coroutine completes.
- * The running coroutine is cancelled when the its job is [cancelled][Job.cancel].
+ * The running coroutine is cancelled when its receive channel is [cancelled][ReceiveChannel.cancel].
  *
  * The [context] for the new coroutine can be explicitly specified.
  * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
@@ -89,7 +74,7 @@
     context: CoroutineContext = DefaultDispatcher,
     capacity: Int = 0,
     block: suspend ProducerScope<E>.() -> Unit
-): ProducerJob<E> {
+): ReceiveChannel<E> {
     val channel = Channel<E>(capacity)
     return ProducerCoroutine(newCoroutineContext(context), channel).apply {
         initParentJob(context[Job])
@@ -106,7 +91,7 @@
     capacity: Int = 0,
     block: suspend ProducerScope<E>.() -> Unit
 ): ProducerJob<E> =
-    produce(context, capacity, block)
+    produce(context, capacity, block) as ProducerJob<E>
 
 private class ProducerCoroutine<E>(parentContext: CoroutineContext, channel: Channel<E>) :
     ChannelCoroutine<E>(parentContext, channel, active = true), ProducerScope<E>, ProducerJob<E>