Improve ReceiveChannel operators implementations to guarantee closing
of the source channels under all circumstances;
`onCompletion` added to `produce` builder;
`ReceiveChannel.consumes(): CompletionHandler` extension fun.

Fixes #279
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt
index 4623ece..87abbf5 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt
@@ -78,7 +78,7 @@
 /**
  * Subscribes to this [BroadcastChannel] and performs the specified action for each received element.
  */
-public inline suspend fun <E> BroadcastChannel<E>.consumeEach(action: (E) -> Unit) =
+public suspend inline fun <E> BroadcastChannel<E>.consumeEach(action: (E) -> Unit) =
     consume {
         for (element in this) action(element)
     }
@@ -93,24 +93,78 @@
 // -------- Operations on ReceiveChannel --------
 
 /**
+ * Returns a [CompletionHandler] that invokes [cancel][ReceiveChannel.cancel] on the [ReceiveChannel]
+ * with the corresponding cause. See also [ReceiveChannel.consume].
+ *
+ * **WARNING**: It is planned that in the future a second invocation of this method
+ * on an channel that is already being consumed is going to fail fast, that is
+ * immediately throw an [IllegalStateException].
+ * See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/167)
+ * for details.
+ */
+public fun ReceiveChannel<*>.consumes(): CompletionHandler =
+    { cause: Throwable? -> cancel(cause) }
+
+/**
+ * Returns a [CompletionHandler] that invokes [cancel][ReceiveChannel.cancel] on all the
+ * specified [ReceiveChannel] instances with the corresponding cause.
+ * See also [ReceiveChannel.consumes()] for a version on one channel.
+ */
+public fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler =
+    { cause: Throwable? ->
+        var exception: Throwable? = null
+        for (channel in channels)
+            try {
+                channel.cancel(cause)
+            } catch (e: Throwable) {
+                if (exception == null) {
+                    exception = e
+                } else {
+                    exception.addSuppressed(e)
+                }
+            }
+        exception?.let { throw it }
+    }
+
+/**
  * Makes sure that the given [block] consumes all elements from the given channel
  * by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
+ *
+ * **WARNING**: It is planned that in the future a second invocation of this method
+ * on an channel that is already being consumed is going to fail fast, that is
+ * immediately throw an [IllegalStateException].
+ * See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/167)
+ * for details.
+ *
+ * The operation is _terminal_.
  */
-public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R =
+public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
+    var cause: Throwable? = null
     try {
-        block()
+        return block()
+    } catch (e: Throwable) {
+        cause = e
+        throw e
     } finally {
-        cancel()
+        cancel(cause)
     }
+}
 
 /**
  * Performs the given [action] for each received element.
  *
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * **WARNING**: It is planned that in the future a second invocation of this method
+ * on an channel that is already being consumed is going to fail fast, that is
+ * immediately throw an [IllegalStateException].
+ * See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/167)
+ * for details.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit) =
+public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit) =
     consume {
-        for (element in this) action(element)
+        for (e in this) action(e)
     }
 
 /**
@@ -123,9 +177,10 @@
 /**
  * Performs the given [action] for each received element.
  *
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E> ReceiveChannel<E>.consumeEachIndexed(action: (IndexedValue<E>) -> Unit) {
+public suspend inline fun <E> ReceiveChannel<E>.consumeEachIndexed(action: (IndexedValue<E>) -> Unit) {
     var index = 0
     consumeEach {
         action(IndexedValue(index++, it))
@@ -136,7 +191,7 @@
  * Returns an element at the given [index] or throws an [IndexOutOfBoundsException] if the [index] is out of bounds of this channel.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public suspend fun <E> ReceiveChannel<E>.elementAt(index: Int): E =
     elementAtOrElse(index) { throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.") }
@@ -145,9 +200,9 @@
  * Returns an element at the given [index] or the result of calling the [defaultValue] function if the [index] is out of bounds of this channel.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E> ReceiveChannel<E>.elementAtOrElse(index: Int, defaultValue: (Int) -> E): E =
+public suspend inline fun <E> ReceiveChannel<E>.elementAtOrElse(index: Int, defaultValue: (Int) -> E): E =
     consume {
         if (index < 0)
             return defaultValue(index)
@@ -163,7 +218,7 @@
  * Returns an element at the given [index] or `null` if the [index] is out of bounds of this channel.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public suspend fun <E> ReceiveChannel<E>.elementAtOrNull(index: Int): E? =
     consume {
@@ -181,18 +236,18 @@
  * Returns the first element matching the given [predicate], or `null` if no such element was found.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E> ReceiveChannel<E>.find(predicate: (E) -> Boolean): E? =
+public suspend inline fun <E> ReceiveChannel<E>.find(predicate: (E) -> Boolean): E? =
     firstOrNull(predicate)
 
 /**
  * Returns the last element matching the given [predicate], or `null` if no such element was found.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E> ReceiveChannel<E>.findLast(predicate: (E) -> Boolean): E? =
+public suspend inline fun <E> ReceiveChannel<E>.findLast(predicate: (E) -> Boolean): E? =
     lastOrNull(predicate)
 
 /**
@@ -200,7 +255,7 @@
  * @throws [NoSuchElementException] if the channel is empty.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public suspend fun <E> ReceiveChannel<E>.first(): E =
     consume {
@@ -215,9 +270,9 @@
  * @throws [NoSuchElementException] if no such element is found.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E> ReceiveChannel<E>.first(predicate: (E) -> Boolean): E {
+public suspend inline fun <E> ReceiveChannel<E>.first(predicate: (E) -> Boolean): E {
     consumeEach {
         if (predicate(it)) return it
     }
@@ -228,7 +283,7 @@
  * Returns the first element, or `null` if the channel is empty.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public suspend fun <E> ReceiveChannel<E>.firstOrNull(): E? =
     consume {
@@ -242,7 +297,7 @@
  * Returns the first element matching the given [predicate], or `null` if element was not found.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public inline suspend fun <E> ReceiveChannel<E>.firstOrNull(predicate: (E) -> Boolean): E? {
     consumeEach {
@@ -255,7 +310,7 @@
  * Returns first index of [element], or -1 if the channel does not contain element.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public suspend fun <E> ReceiveChannel<E>.indexOf(element: E): Int {
     var index = 0
@@ -271,9 +326,9 @@
  * Returns index of the first element matching the given [predicate], or -1 if the channel does not contain such element.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E> ReceiveChannel<E>.indexOfFirst(predicate: (E) -> Boolean): Int {
+public suspend inline fun <E> ReceiveChannel<E>.indexOfFirst(predicate: (E) -> Boolean): Int {
     var index = 0
     consumeEach {
         if (predicate(it))
@@ -287,7 +342,7 @@
  * Returns index of the last element matching the given [predicate], or -1 if the channel does not contain such element.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public inline suspend fun <E> ReceiveChannel<E>.indexOfLast(predicate: (E) -> Boolean): Int {
     var lastIndex = -1
@@ -305,7 +360,7 @@
  * @throws [NoSuchElementException] if the channel is empty.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public suspend fun <E> ReceiveChannel<E>.last(): E =
     consume {
@@ -323,9 +378,9 @@
  * @throws [NoSuchElementException] if no such element is found.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E> ReceiveChannel<E>.last(predicate: (E) -> Boolean): E {
+public suspend inline fun <E> ReceiveChannel<E>.last(predicate: (E) -> Boolean): E {
     var last: E? = null
     var found = false
     consumeEach {
@@ -343,7 +398,7 @@
  * Returns last index of [element], or -1 if the channel does not contain element.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public suspend fun <E> ReceiveChannel<E>.lastIndexOf(element: E): Int {
     var lastIndex = -1
@@ -360,7 +415,7 @@
  * Returns the last element, or `null` if the channel is empty.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public suspend fun <E> ReceiveChannel<E>.lastOrNull(): E? =
     consume {
@@ -377,9 +432,9 @@
  * Returns the last element matching the given [predicate], or `null` if no such element was found.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E> ReceiveChannel<E>.lastOrNull(predicate: (E) -> Boolean): E? {
+public suspend inline fun <E> ReceiveChannel<E>.lastOrNull(predicate: (E) -> Boolean): E? {
     var last: E? = null
     consumeEach {
         if (predicate(it)) {
@@ -393,7 +448,7 @@
  * Returns the single element, or throws an exception if the channel is empty or has more than one element.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public suspend fun <E> ReceiveChannel<E>.single(): E =
     consume {
@@ -410,9 +465,9 @@
  * Returns the single element matching the given [predicate], or throws exception if there is no or more than one matching element.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E> ReceiveChannel<E>.single(predicate: (E) -> Boolean): E {
+public suspend inline fun <E> ReceiveChannel<E>.single(predicate: (E) -> Boolean): E {
     var single: E? = null
     var found = false
     consumeEach {
@@ -431,7 +486,7 @@
  * Returns single element, or `null` if the channel is empty or has more than one element.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public suspend fun <E> ReceiveChannel<E>.singleOrNull(): E? =
     consume {
@@ -448,9 +503,9 @@
  * Returns the single element matching the given [predicate], or `null` if element was not found or more than one element was found.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E> ReceiveChannel<E>.singleOrNull(predicate: (E) -> Boolean): E? {
+public suspend inline fun <E> ReceiveChannel<E>.singleOrNull(predicate: (E) -> Boolean): E? {
     var single: E? = null
     var found = false
     consumeEach {
@@ -468,22 +523,20 @@
  * Returns a channel containing all elements except first [n] elements.
  *
  * The operation is _intermediate_ and _stateless_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public fun <E> ReceiveChannel<E>.drop(n: Int, context: CoroutineContext = Unconfined): ReceiveChannel<E> =
-    produce(context) {
-        consume {
-            require(n >= 0) { "Requested element count $n is less than zero." }
-            var remaining: Int = n
-            if (remaining > 0)
-                for (element in this@drop) {
-                    remaining--
-                    if (remaining == 0)
-                        break
-                }
-            for (element in this@drop) {
-                send(element)
+    produce(context, onCompletion = consumes()) {
+        require(n >= 0) { "Requested element count $n is less than zero." }
+        var remaining: Int = n
+        if (remaining > 0)
+            for (e in this@drop) {
+                remaining--
+                if (remaining == 0)
+                    break
             }
+        for (e in this@drop) {
+            send(e)
         }
     }
 
@@ -491,21 +544,19 @@
  * Returns a channel containing all elements except first elements that satisfy the given [predicate].
  *
  * The operation is _intermediate_ and _stateless_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 // todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
 public fun <E> ReceiveChannel<E>.dropWhile(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
-    produce(context) {
-        consume {
-            for (element in this@dropWhile) {
-                if (!predicate(element)) {
-                    send(element)
-                    break
-                }
+    produce(context, onCompletion = consumes()) {
+        for (e in this@dropWhile) {
+            if (!predicate(e)) {
+                send(e)
+                break
             }
-            for (element in this@dropWhile) {
-                send(element)
-            }
+        }
+        for (e in this@dropWhile) {
+            send(e)
         }
     }
 
@@ -513,13 +564,13 @@
  * Returns a channel containing only elements matching the given [predicate].
  *
  * The operation is _intermediate_ and _stateless_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 // todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
 public fun <E> ReceiveChannel<E>.filter(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
-    produce(context) {
-        consumeEach {
-            if (predicate(it)) send(it)
+    produce(context, onCompletion = consumes()) {
+        for (e in this@filter) {
+            if (predicate(e)) send(e)
         }
     }
 
@@ -529,14 +580,14 @@
  * and returns the result of predicate evaluation on the element.
  *
  * The operation is _intermediate_ and _stateless_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 // todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
 public fun <E> ReceiveChannel<E>.filterIndexed(context: CoroutineContext = Unconfined, predicate: suspend (index: Int, E) -> Boolean): ReceiveChannel<E> =
-    produce(context) {
+    produce(context, onCompletion = consumes()) {
         var index = 0
-        consumeEach {
-            if (predicate(index++, it)) send(it)
+        for (e in this@filterIndexed) {
+            if (predicate(index++, e)) send(e)
         }
     }
 
@@ -546,9 +597,9 @@
  * and returns the result of predicate evaluation on the element.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterIndexedTo(destination: C, predicate: (index: Int, E) -> Boolean): C {
+public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterIndexedTo(destination: C, predicate: (index: Int, E) -> Boolean): C {
     consumeEachIndexed { (index, element) ->
         if (predicate(index, element)) destination.add(element)
     }
@@ -561,9 +612,9 @@
  * and returns the result of predicate evaluation on the element.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterIndexedTo(destination: C, predicate: (index: Int, E) -> Boolean): C {
+public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterIndexedTo(destination: C, predicate: (index: Int, E) -> Boolean): C {
     consumeEachIndexed { (index, element) ->
         if (predicate(index, element)) destination.send(element)
     }
@@ -574,18 +625,13 @@
  * Returns a channel containing all elements not matching the given [predicate].
  *
  * The operation is _intermediate_ and _stateless_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 // todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
 public fun <E> ReceiveChannel<E>.filterNot(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
     filter(context) { !predicate(it) }
 
 /**
- * Returns a channel containing all elements not matching the given [predicate].
- *
- * The operation is _intermediate_ and _stateless_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
- *
  * @suppress **Deprecated**: For binary compatibility only
  */
 @Deprecated("For binary compatibility only", level = DeprecationLevel.HIDDEN)
@@ -595,7 +641,7 @@
  * Returns a channel containing all elements that are not `null`.
  *
  * The operation is _intermediate_ and _stateless_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 @Suppress("UNCHECKED_CAST")
 public fun <E : Any> ReceiveChannel<E?>.filterNotNull(): ReceiveChannel<E> =
@@ -605,7 +651,7 @@
  * Appends all elements that are not `null` to the given [destination].
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public suspend fun <E : Any, C : MutableCollection<in E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
     consumeEach {
@@ -618,7 +664,7 @@
  * Appends all elements that are not `null` to the given [destination].
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public suspend fun <E : Any, C : SendChannel<E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
     consumeEach {
@@ -631,9 +677,9 @@
  * Appends all elements not matching the given [predicate] to the given [destination].
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterNotTo(destination: C, predicate: (E) -> Boolean): C {
+public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterNotTo(destination: C, predicate: (E) -> Boolean): C {
     consumeEach {
         if (!predicate(it)) destination.add(it)
     }
@@ -644,9 +690,9 @@
  * Appends all elements not matching the given [predicate] to the given [destination].
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterNotTo(destination: C, predicate: (E) -> Boolean): C {
+public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterNotTo(destination: C, predicate: (E) -> Boolean): C {
     consumeEach {
         if (!predicate(it)) destination.send(it)
     }
@@ -657,9 +703,9 @@
  * Appends all elements matching the given [predicate] to the given [destination].
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterTo(destination: C, predicate: (E) -> Boolean): C {
+public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterTo(destination: C, predicate: (E) -> Boolean): C {
     consumeEach {
         if (predicate(it)) destination.add(it)
     }
@@ -670,9 +716,9 @@
  * Appends all elements matching the given [predicate] to the given [destination].
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterTo(destination: C, predicate: (E) -> Boolean): C {
+public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterTo(destination: C, predicate: (E) -> Boolean): C {
     consumeEach {
         if (predicate(it)) destination.send(it)
     }
@@ -683,20 +729,18 @@
  * Returns a channel containing first [n] elements.
  *
  * The operation is _intermediate_ and _stateless_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public fun <E> ReceiveChannel<E>.take(n: Int, context: CoroutineContext = Unconfined): ReceiveChannel<E> =
-    produce(context) {
-        consume {
-            if (n == 0) return@produce
-            require(n >= 0) { "Requested element count $n is less than zero." }
-            var remaining: Int = n
-            for (element in this@take) {
-                send(element)
-                remaining--
-                if (remaining == 0)
-                    return@produce
-            }
+    produce(context, onCompletion = consumes()) {
+        if (n == 0) return@produce
+        require(n >= 0) { "Requested element count $n is less than zero." }
+        var remaining: Int = n
+        for (e in this@take) {
+            send(e)
+            remaining--
+            if (remaining == 0)
+                return@produce
         }
     }
 
@@ -704,14 +748,14 @@
  * Returns a channel containing first elements satisfying the given [predicate].
  *
  * The operation is _intermediate_ and _stateless_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 // todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
 public fun <E> ReceiveChannel<E>.takeWhile(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
-    produce(context) {
-        consumeEach {
-            if (!predicate(it)) return@produce
-            send(it)
+    produce(context, onCompletion = consumes()) {
+        for (e in this@takeWhile) {
+            if (!predicate(e)) return@produce
+            send(e)
         }
     }
 
@@ -724,9 +768,9 @@
  * The returned map preserves the entry iteration order of the original channel.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E, K, V> ReceiveChannel<E>.associate(transform: (E) -> Pair<K, V>): Map<K, V> =
+public suspend inline fun <E, K, V> ReceiveChannel<E>.associate(transform: (E) -> Pair<K, V>): Map<K, V> =
     associateTo(LinkedHashMap(), transform)
 
 /**
@@ -738,9 +782,9 @@
  * The returned map preserves the entry iteration order of the original channel.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E, K> ReceiveChannel<E>.associateBy(keySelector: (E) -> K): Map<K, E> =
+public suspend inline fun <E, K> ReceiveChannel<E>.associateBy(keySelector: (E) -> K): Map<K, E> =
     associateByTo(LinkedHashMap(), keySelector)
 
 /**
@@ -751,9 +795,9 @@
  * The returned map preserves the entry iteration order of the original channel.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E, K, V> ReceiveChannel<E>.associateBy(keySelector: (E) -> K, valueTransform: (E) -> V): Map<K, V> =
+public suspend inline fun <E, K, V> ReceiveChannel<E>.associateBy(keySelector: (E) -> K, valueTransform: (E) -> V): Map<K, V> =
     associateByTo(LinkedHashMap(), keySelector, valueTransform)
 
 /**
@@ -764,9 +808,9 @@
  * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E, K, M : MutableMap<in K, in E>> ReceiveChannel<E>.associateByTo(destination: M, keySelector: (E) -> K): M {
+public suspend inline fun <E, K, M : MutableMap<in K, in E>> ReceiveChannel<E>.associateByTo(destination: M, keySelector: (E) -> K): M {
     consumeEach {
         destination.put(keySelector(it), it)
     }
@@ -781,9 +825,9 @@
  * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateByTo(destination: M, keySelector: (E) -> K, valueTransform: (E) -> V): M {
+public suspend inline fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateByTo(destination: M, keySelector: (E) -> K, valueTransform: (E) -> V): M {
     consumeEach {
         destination.put(keySelector(it), valueTransform(it))
     }
@@ -797,9 +841,9 @@
  * If any of two pairs would have the same key the last one gets added to the map.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateTo(destination: M, transform: (E) -> Pair<K, V>): M {
+public suspend inline fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateTo(destination: M, transform: (E) -> Pair<K, V>): M {
     consumeEach {
         destination += transform(it)
     }
@@ -811,7 +855,7 @@
  * and appends the results to the given [destination].
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.toChannel(destination: C): C {
     consumeEach {
@@ -824,7 +868,7 @@
  * Appends all elements to the given [destination] collection.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.toCollection(destination: C): C {
     consumeEach {
@@ -837,7 +881,7 @@
  * Returns a [List] containing all elements.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public suspend fun <E> ReceiveChannel<E>.toList(): List<E> =
     this.toMutableList()
@@ -846,7 +890,7 @@
  * Returns a [Map] filled with all elements of this channel.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public suspend fun <K, V> ReceiveChannel<Pair<K, V>>.toMap(): Map<K, V> =
     toMap(LinkedHashMap())
@@ -855,7 +899,7 @@
  * Returns a [MutableMap] filled with all elements of this channel.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public suspend fun <K, V, M : MutableMap<in K, in V>> ReceiveChannel<Pair<K, V>>.toMap(destination: M): M {
     consumeEach {
@@ -868,7 +912,7 @@
  * Returns a [MutableList] filled with all elements of this channel.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public suspend fun <E> ReceiveChannel<E>.toMutableList(): MutableList<E> =
     toCollection(ArrayList())
@@ -879,22 +923,22 @@
  * The returned set preserves the element iteration order of the original channel.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public suspend fun <E> ReceiveChannel<E>.toSet(): Set<E> =
-    toCollection(LinkedHashSet())
+    this.toMutableSet()
 
 /**
  * Returns a single channel of all elements from results of [transform] function being invoked on each element of original channel.
  *
  * The operation is _intermediate_ and _stateless_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 // todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
 public fun <E, R> ReceiveChannel<E>.flatMap(context: CoroutineContext = Unconfined, transform: suspend (E) -> ReceiveChannel<R>): ReceiveChannel<R> =
-    produce(context) {
-        consumeEach {
-            transform(it).toChannel(this)
+    produce(context, onCompletion = consumes()) {
+        for (e in this@flatMap) {
+            transform(e).toChannel(this)
         }
     }
 
@@ -905,9 +949,9 @@
  * The returned map preserves the entry iteration order of the keys produced from the original channel.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E, K> ReceiveChannel<E>.groupBy(keySelector: (E) -> K): Map<K, List<E>> =
+public suspend inline fun <E, K> ReceiveChannel<E>.groupBy(keySelector: (E) -> K): Map<K, List<E>> =
     groupByTo(LinkedHashMap(), keySelector)
 
 /**
@@ -918,9 +962,9 @@
  * The returned map preserves the entry iteration order of the keys produced from the original channel.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E, K, V> ReceiveChannel<E>.groupBy(keySelector: (E) -> K, valueTransform: (E) -> V): Map<K, List<V>> =
+public suspend inline fun <E, K, V> ReceiveChannel<E>.groupBy(keySelector: (E) -> K, valueTransform: (E) -> V): Map<K, List<V>> =
     groupByTo(LinkedHashMap(), keySelector, valueTransform)
 
 /**
@@ -930,9 +974,9 @@
  * @return The [destination] map.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E, K, M : MutableMap<in K, MutableList<E>>> ReceiveChannel<E>.groupByTo(destination: M, keySelector: (E) -> K): M {
+public suspend inline fun <E, K, M : MutableMap<in K, MutableList<E>>> ReceiveChannel<E>.groupByTo(destination: M, keySelector: (E) -> K): M {
     consumeEach {
         val key = keySelector(it)
         val list = destination.getOrPut(key) { ArrayList() }
@@ -949,9 +993,9 @@
  * @return The [destination] map.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E, K, V, M : MutableMap<in K, MutableList<V>>> ReceiveChannel<E>.groupByTo(destination: M, keySelector: (E) -> K, valueTransform: (E) -> V): M {
+public suspend inline fun <E, K, V, M : MutableMap<in K, MutableList<V>>> ReceiveChannel<E>.groupByTo(destination: M, keySelector: (E) -> K, valueTransform: (E) -> V): M {
     consumeEach {
         val key = keySelector(it)
         val list = destination.getOrPut(key) { ArrayList() }
@@ -965,11 +1009,11 @@
  * to each element in the original channel.
  *
  * The operation is _intermediate_ and _stateless_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+// todo: mark transform with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
 public fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, transform: suspend (E) -> R): ReceiveChannel<R> =
-    produce(context) {
+    produce(context, onCompletion = consumes()) {
         consumeEach {
             send(transform(it))
         }
@@ -982,14 +1026,14 @@
  * and returns the result of the transform applied to the element.
  *
  * The operation is _intermediate_ and _stateless_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 // todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
 public fun <E, R> ReceiveChannel<E>.mapIndexed(context: CoroutineContext = Unconfined, transform: suspend (index: Int, E) -> R): ReceiveChannel<R> =
-    produce(context) {
+    produce(context, onCompletion = consumes()) {
         var index = 0
-        consumeEach {
-            send(transform(index++, it))
+        for (e in this@mapIndexed) {
+            send(transform(index++, e))
         }
     }
 
@@ -1000,7 +1044,7 @@
  * and returns the result of the transform applied to the element.
  *
  * The operation is _intermediate_ and _stateless_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 // todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
 public fun <E, R : Any> ReceiveChannel<E>.mapIndexedNotNull(context: CoroutineContext = Unconfined, transform: suspend (index: Int, E) -> R?): ReceiveChannel<R> =
@@ -1013,9 +1057,9 @@
  * and returns the result of the transform applied to the element.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedNotNullTo(destination: C, transform: (index: Int, E) -> R?): C {
+public suspend inline fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedNotNullTo(destination: C, transform: (index: Int, E) -> R?): C {
     consumeEachIndexed { (index, element) ->
         transform(index, element)?.let { destination.add(it) }
     }
@@ -1029,9 +1073,9 @@
  * and returns the result of the transform applied to the element.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E, R : Any, C : SendChannel<R>> ReceiveChannel<E>.mapIndexedNotNullTo(destination: C, transform: (index: Int, E) -> R?): C {
+public suspend inline fun <E, R : Any, C : SendChannel<R>> ReceiveChannel<E>.mapIndexedNotNullTo(destination: C, transform: (index: Int, E) -> R?): C {
     consumeEachIndexed { (index, element) ->
         transform(index, element)?.let { destination.send(it) }
     }
@@ -1045,9 +1089,9 @@
  * and returns the result of the transform applied to the element.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedTo(destination: C, transform: (index: Int, E) -> R): C {
+public suspend inline fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedTo(destination: C, transform: (index: Int, E) -> R): C {
     var index = 0
     consumeEach {
         destination.add(transform(index++, it))
@@ -1062,9 +1106,9 @@
  * and returns the result of the transform applied to the element.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E, R, C : SendChannel<R>> ReceiveChannel<E>.mapIndexedTo(destination: C, transform: (index: Int, E) -> R): C {
+public suspend inline fun <E, R, C : SendChannel<R>> ReceiveChannel<E>.mapIndexedTo(destination: C, transform: (index: Int, E) -> R): C {
     var index = 0
     consumeEach {
         destination.send(transform(index++, it))
@@ -1077,7 +1121,7 @@
  * to each element in the original channel.
  *
  * The operation is _intermediate_ and _stateless_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 // todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
 public fun <E, R : Any> ReceiveChannel<E>.mapNotNull(context: CoroutineContext = Unconfined, transform: suspend (E) -> R?): ReceiveChannel<R> =
@@ -1088,9 +1132,9 @@
  * and appends only the non-null results to the given [destination].
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapNotNullTo(destination: C, transform: (E) -> R?): C {
+public suspend inline fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapNotNullTo(destination: C, transform: (E) -> R?): C {
     consumeEach {
         transform(it)?.let { destination.add(it) }
     }
@@ -1102,9 +1146,9 @@
  * and appends only the non-null results to the given [destination].
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E, R : Any, C : SendChannel<R>> ReceiveChannel<E>.mapNotNullTo(destination: C, transform: (E) -> R?): C {
+public suspend inline fun <E, R : Any, C : SendChannel<R>> ReceiveChannel<E>.mapNotNullTo(destination: C, transform: (E) -> R?): C {
     consumeEach {
         transform(it)?.let { destination.send(it) }
     }
@@ -1116,9 +1160,9 @@
  * and appends the results to the given [destination].
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapTo(destination: C, transform: (E) -> R): C {
+public suspend inline fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapTo(destination: C, transform: (E) -> R): C {
     consumeEach {
         destination.add(transform(it))
     }
@@ -1130,9 +1174,9 @@
  * and appends the results to the given [destination].
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E, R, C : SendChannel<R>> ReceiveChannel<E>.mapTo(destination: C, transform: (E) -> R): C {
+public suspend inline fun <E, R, C : SendChannel<R>> ReceiveChannel<E>.mapTo(destination: C, transform: (E) -> R): C {
     consumeEach {
         destination.send(transform(it))
     }
@@ -1143,13 +1187,13 @@
  * Returns a channel of [IndexedValue] for each element of the original channel.
  *
  * The operation is _intermediate_ and _stateless_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public fun <E> ReceiveChannel<E>.withIndex(context: CoroutineContext = Unconfined): ReceiveChannel<IndexedValue<E>> =
-    produce(context) {
+    produce(context, onCompletion = consumes()) {
         var index = 0
-        consumeEach {
-            send(IndexedValue(index++, it))
+        for (e in this@withIndex) {
+            send(IndexedValue(index++, e))
         }
     }
 
@@ -1159,7 +1203,7 @@
  * The elements in the resulting channel are in the same order as they were in the source channel.
  *
  * The operation is _intermediate_ and _stateful_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public fun <E> ReceiveChannel<E>.distinct(): ReceiveChannel<E> =
     this.distinctBy { it }
@@ -1171,16 +1215,16 @@
  * The elements in the resulting channel are in the same order as they were in the source channel.
  *
  * The operation is _intermediate_ and _stateful_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 // todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
 public fun <E, K> ReceiveChannel<E>.distinctBy(context: CoroutineContext = Unconfined, selector: suspend (E) -> K): ReceiveChannel<E> =
-    produce(context) {
+    produce(context, onCompletion = consumes()) {
         val keys = HashSet<K>()
-        consumeEach {
-            val k = selector(it)
+        for (e in this@distinctBy) {
+            val k = selector(e)
             if (k !in keys) {
-                send(it)
+                send(e)
                 keys += k
             }
         }
@@ -1192,23 +1236,18 @@
  * The returned set preserves the element iteration order of the original channel.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public suspend fun <E> ReceiveChannel<E>.toMutableSet(): MutableSet<E> {
-    val set = LinkedHashSet<E>()
-    consumeEach {
-        set.add(it)
-    }
-    return set
-}
+public suspend fun <E> ReceiveChannel<E>.toMutableSet(): MutableSet<E> =
+    toCollection(LinkedHashSet())
 
 /**
  * Returns `true` if all elements match the given [predicate].
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E> ReceiveChannel<E>.all(predicate: (E) -> Boolean): Boolean {
+public suspend inline fun <E> ReceiveChannel<E>.all(predicate: (E) -> Boolean): Boolean {
     consumeEach {
         if (!predicate(it)) return false
     }
@@ -1219,7 +1258,7 @@
  * Returns `true` if channel has at least one element.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public suspend fun <E> ReceiveChannel<E>.any(): Boolean =
     consume {
@@ -1230,9 +1269,9 @@
  * Returns `true` if at least one element matches the given [predicate].
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E> ReceiveChannel<E>.any(predicate: (E) -> Boolean): Boolean {
+public suspend inline fun <E> ReceiveChannel<E>.any(predicate: (E) -> Boolean): Boolean {
     consumeEach {
         if (predicate(it)) return true
     }
@@ -1243,7 +1282,7 @@
  * Returns the number of elements in this channel.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public suspend fun <E> ReceiveChannel<E>.count(): Int {
     var count = 0
@@ -1255,9 +1294,9 @@
  * Returns the number of elements matching the given [predicate].
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E> ReceiveChannel<E>.count(predicate: (E) -> Boolean): Int {
+public suspend inline fun <E> ReceiveChannel<E>.count(predicate: (E) -> Boolean): Int {
     var count = 0
     consumeEach {
         if (predicate(it)) count++
@@ -1269,9 +1308,9 @@
  * Accumulates value starting with [initial] value and applying [operation] from left to right to current accumulator value and each element.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E, R> ReceiveChannel<E>.fold(initial: R, operation: (acc: R, E) -> R): R {
+public suspend inline fun <E, R> ReceiveChannel<E>.fold(initial: R, operation: (acc: R, E) -> R): R {
     var accumulator = initial
     consumeEach {
         accumulator = operation(accumulator, it)
@@ -1286,9 +1325,9 @@
  * and the element itself, and calculates the next accumulator value.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E, R> ReceiveChannel<E>.foldIndexed(initial: R, operation: (index: Int, acc: R, E) -> R): R {
+public suspend inline fun <E, R> ReceiveChannel<E>.foldIndexed(initial: R, operation: (index: Int, acc: R, E) -> R): R {
     var index = 0
     var accumulator = initial
     consumeEach {
@@ -1301,9 +1340,9 @@
  * Returns the first element yielding the largest value of the given function or `null` if there are no elements.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E, R : Comparable<R>> ReceiveChannel<E>.maxBy(selector: (E) -> R): E? =
+public suspend inline fun <E, R : Comparable<R>> ReceiveChannel<E>.maxBy(selector: (E) -> R): E? =
     consume {
         val iterator = iterator()
         if (!iterator.hasNext()) return null
@@ -1324,7 +1363,7 @@
  * Returns the first element having the largest value according to the provided [comparator] or `null` if there are no elements.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public suspend fun <E> ReceiveChannel<E>.maxWith(comparator: Comparator<in E>): E? =
     consume {
@@ -1342,9 +1381,9 @@
  * Returns the first element yielding the smallest value of the given function or `null` if there are no elements.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E, R : Comparable<R>> ReceiveChannel<E>.minBy(selector: (E) -> R): E? =
+public suspend inline fun <E, R : Comparable<R>> ReceiveChannel<E>.minBy(selector: (E) -> R): E? =
     consume {
         val iterator = iterator()
         if (!iterator.hasNext()) return null
@@ -1365,7 +1404,7 @@
  * Returns the first element having the smallest value according to the provided [comparator] or `null` if there are no elements.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public suspend fun <E> ReceiveChannel<E>.minWith(comparator: Comparator<in E>): E? =
     consume {
@@ -1383,7 +1422,7 @@
  * Returns `true` if the channel has no elements.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public suspend fun <E> ReceiveChannel<E>.none(): Boolean =
     consume {
@@ -1394,9 +1433,9 @@
  * Returns `true` if no elements match the given [predicate].
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E> ReceiveChannel<E>.none(predicate: (E) -> Boolean): Boolean {
+public suspend inline fun <E> ReceiveChannel<E>.none(predicate: (E) -> Boolean): Boolean {
     consumeEach {
         if (predicate(it)) return false
     }
@@ -1407,9 +1446,9 @@
  * Accumulates value starting with the first element and applying [operation] from left to right to current accumulator value and each element.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <S, E : S> ReceiveChannel<E>.reduce(operation: (acc: S, E) -> S): S =
+public suspend inline fun <S, E : S> ReceiveChannel<E>.reduce(operation: (acc: S, E) -> S): S =
     consume {
         val iterator = this.iterator()
         if (!iterator.hasNext()) throw UnsupportedOperationException("Empty channel can't be reduced.")
@@ -1427,9 +1466,10 @@
  * and the element itself and calculates the next accumulator value.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <S, E : S> ReceiveChannel<E>.reduceIndexed(operation: (index: Int, acc: S, E) -> S): S =
+// todo: mark operation with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
+public suspend inline fun <S, E : S> ReceiveChannel<E>.reduceIndexed(operation: (index: Int, acc: S, E) -> S): S =
     consume {
         val iterator = this.iterator()
         if (!iterator.hasNext()) throw UnsupportedOperationException("Empty channel can't be reduced.")
@@ -1445,9 +1485,9 @@
  * Returns the sum of all values produced by [selector] function applied to each element in the channel.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E> ReceiveChannel<E>.sumBy(selector: (E) -> Int): Int {
+public suspend inline fun <E> ReceiveChannel<E>.sumBy(selector: (E) -> Int): Int {
     var sum = 0
     consumeEach {
         sum += selector(it)
@@ -1459,9 +1499,9 @@
  * Returns the sum of all values produced by [selector] function applied to each element in the channel.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E> ReceiveChannel<E>.sumByDouble(selector: (E) -> Double): Double {
+public suspend inline fun <E> ReceiveChannel<E>.sumByDouble(selector: (E) -> Double): Double {
     var sum = 0.0
     consumeEach {
         sum += selector(it)
@@ -1473,7 +1513,7 @@
  * Returns an original collection containing all the non-`null` elements, throwing an [IllegalArgumentException] if there are any `null` elements.
  *
  * The operation is _intermediate_ and _stateless_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
 public fun <E : Any> ReceiveChannel<E?>.requireNoNulls(): ReceiveChannel<E> =
     map { it ?: throw IllegalArgumentException("null element found in $this.") }
@@ -1484,9 +1524,9 @@
  * while *second* list contains elements for which [predicate] yielded `false`.
  *
  * The operation is _terminal_.
- * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
  */
-public inline suspend fun <E> ReceiveChannel<E>.partition(predicate: (E) -> Boolean): Pair<List<E>, List<E>> {
+public suspend inline fun <E> ReceiveChannel<E>.partition(predicate: (E) -> Boolean): Pair<List<E>, List<E>> {
     val first = ArrayList<E>()
     val second = ArrayList<E>()
     consumeEach {
@@ -1504,7 +1544,7 @@
  * Resulting channel has length of shortest input channel.
  *
  * The operation is _intermediate_ and _stateless_.
- * This function [consumes][consume] all elements of both the original [ReceiveChannel] and the `other` one.
+ * This function [consumes][ReceiveChannel.consume] all elements of both the original [ReceiveChannel] and the `other` one.
  */
 public infix fun <E, R> ReceiveChannel<E>.zip(other: ReceiveChannel<R>): ReceiveChannel<Pair<E, R>> =
     zip(other) { t1, t2 -> t1 to t2 }
@@ -1515,15 +1555,15 @@
  * The operation is _intermediate_ and _stateless_.
  * This function [consumes][consume] all elements of both the original [ReceiveChannel] and the `other` one.
  */
+// todo: mark transform with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
 public fun <E, R, V> ReceiveChannel<E>.zip(other: ReceiveChannel<R>, context: CoroutineContext = Unconfined, transform: (a: E, b: R) -> V): ReceiveChannel<V> =
-    produce(context) {
-        other.consume {
-            val otherIterator = other.iterator()
-            this@zip.consumeEach { element1 ->
-                if (!otherIterator.hasNext()) return@consumeEach
-                val element2 = otherIterator.next()
-                send(transform(element1, element2))
-            }
+    produce(context, onCompletion = consumesAll(this, other)) {
+        val otherIterator = other.iterator()
+        this@zip.consumeEach { element1 ->
+            if (!otherIterator.hasNext()) return@consumeEach
+            val element2 = otherIterator.next()
+            send(transform(element1, element2))
         }
     }
 
+
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt
index 61e6a6f..1565ecd 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt
@@ -70,17 +70,20 @@
  * @param context context of the coroutine. The default value is [DefaultDispatcher].
  * @param capacity capacity of the channel's buffer (no buffer by default).
  * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).*
+ * @param onCompletion optional completion handler for the producer coroutine (see [Job.invokeOnCompletion]).
  * @param block the coroutine code.
  */
 public fun <E> produce(
     context: CoroutineContext = DefaultDispatcher,
     capacity: Int = 0,
     parent: Job? = null,
+    onCompletion: CompletionHandler? = null,
     block: suspend ProducerScope<E>.() -> Unit
 ): ReceiveChannel<E> {
     val channel = Channel<E>(capacity)
     val newContext = newCoroutineContext(context, parent)
     val coroutine = ProducerCoroutine(newContext, channel)
+    if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
     coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
     return coroutine
 }
@@ -90,6 +93,15 @@
 public fun <E> produce(
     context: CoroutineContext = DefaultDispatcher,
     capacity: Int = 0,
+    parent: Job? = null,
+    block: suspend ProducerScope<E>.() -> Unit
+): ReceiveChannel<E> = produce(context, capacity, parent, block = block)
+
+/** @suppress **Deprecated**: Binary compatibility */
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public fun <E> produce(
+    context: CoroutineContext = DefaultDispatcher,
+    capacity: Int = 0,
     block: suspend ProducerScope<E>.() -> Unit
 ): ProducerJob<E> =
     produce(context, capacity, block = block) as ProducerJob<E>
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsConsumeTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsConsumeTest.kt
new file mode 100644
index 0000000..18c3234
--- /dev/null
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsConsumeTest.kt
@@ -0,0 +1,911 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+/**
+ * Tests that various operators on channels properly consume (close) their source channels.
+ */
+class ChannelsConsumeTest {
+    private val sourceList = (1..10).toList()
+
+    // test source with numbers 1..10
+    private fun testSource() = produce {
+        for (i in sourceList) {
+            send(i)
+        }
+    }
+
+    @Test
+    fun testConsume() {
+        checkTerminal {
+            consume {
+                assertEquals(1, receive())
+            }
+        }
+    }
+
+    @Test
+    fun testConsumeEach() {
+        checkTerminal {
+            var sum = 0
+            consumeEach { sum += it }
+            assertEquals(55, sum)
+        }
+    }
+
+    @Test
+    fun testConsumeEachIndexed() {
+        checkTerminal {
+            var sum = 0
+            consumeEachIndexed { (index, i) -> sum += index * i }
+            assertEquals(330, sum)
+        }
+    }
+
+    @Test
+    fun testElementAt() {
+        checkTerminal {
+            assertEquals(2, elementAt(1))
+        }
+        checkTerminal(expected = { it is IndexOutOfBoundsException }) {
+            elementAt(10)
+        }
+    }
+
+    @Test
+    fun testElementAtOrElse() {
+        checkTerminal {
+            assertEquals(3, elementAtOrElse(2) { error("Cannot happen") })
+        }
+        checkTerminal {
+            assertEquals(-23, elementAtOrElse(10) { -23 })
+        }
+    }
+
+    @Test
+    fun testElementOrNull() {
+        checkTerminal {
+            assertEquals(4, elementAtOrNull(3))
+        }
+        checkTerminal {
+            assertEquals(null, elementAtOrNull(10))
+        }
+    }
+
+    @Test
+    fun testFind() {
+        checkTerminal {
+            assertEquals(3, find { it % 3 == 0 })
+        }
+    }
+
+    @Test
+    fun testFindLast() {
+        checkTerminal {
+            assertEquals(9, findLast { it % 3 == 0 })
+        }
+    }
+
+    @Test
+    fun testFirst() {
+        checkTerminal {
+            assertEquals(1, first())
+        }
+    }
+
+    @Test
+    fun testFirstPredicate() {
+        checkTerminal {
+            assertEquals(3, first { it % 3 == 0 })
+        }
+        checkTerminal(expected = { it is NoSuchElementException }) {
+            first { it > 10 }
+        }
+    }
+
+    @Test
+    fun testFirstOrNull() {
+        checkTerminal {
+            assertEquals(1, firstOrNull())
+        }
+    }
+
+    @Test
+    fun testFirstOrNullPredicate() {
+        checkTerminal {
+            assertEquals(3, firstOrNull { it % 3 == 0 })
+        }
+        checkTerminal {
+            assertEquals(null, firstOrNull { it > 10 })
+        }
+    }
+
+    @Test
+    fun testIndexOf() {
+        checkTerminal {
+            assertEquals(2, indexOf(3))
+        }
+        checkTerminal {
+            assertEquals(-1, indexOf(11))
+        }
+    }
+
+    @Test
+    fun testIndexOfFirst() {
+        checkTerminal {
+            assertEquals(2, indexOfFirst { it % 3 == 0 })
+        }
+        checkTerminal {
+            assertEquals(-1, indexOfFirst { it > 10 })
+        }
+    }
+
+    @Test
+    fun testIndexOfLast() {
+        checkTerminal {
+            assertEquals(8, indexOfLast { it % 3 == 0 })
+        }
+        checkTerminal {
+            assertEquals(-1, indexOfLast { it > 10 })
+        }
+    }
+
+    @Test
+    fun testLast() {
+        checkTerminal {
+            assertEquals(10, last())
+        }
+    }
+
+    @Test
+    fun testLastPredicate() {
+        checkTerminal {
+            assertEquals(9, last { it % 3 == 0 })
+        }
+        checkTerminal(expected = { it is NoSuchElementException }) {
+            last { it > 10 }
+        }
+    }
+
+    @Test
+    fun testLastIndexOf() {
+        checkTerminal {
+            assertEquals(8, lastIndexOf(9))
+        }
+    }
+
+    @Test
+    fun testLastOrNull() {
+        checkTerminal {
+            assertEquals(10, lastOrNull())
+        }
+    }
+
+    @Test
+    fun testLastOrNullPredicate() {
+        checkTerminal {
+            assertEquals(9, lastOrNull { it % 3 == 0 })
+        }
+        checkTerminal {
+            assertEquals(null, lastOrNull { it > 10 })
+        }
+    }
+
+    @Test
+    fun testSingle() {
+        checkTerminal(expected = { it is IllegalArgumentException }) {
+            single()
+        }
+    }
+
+    @Test
+    fun testSinglePredicate() {
+        checkTerminal {
+            assertEquals(7, single { it % 7 == 0 })
+        }
+        checkTerminal(expected = { it is IllegalArgumentException }) {
+            single { it % 3 == 0 }
+        }
+        checkTerminal(expected = { it is NoSuchElementException }) {
+            single { it > 10 }
+        }
+    }
+
+    @Test
+    fun testSingleOrNull() {
+        checkTerminal {
+            assertEquals(null, singleOrNull())
+        }
+    }
+
+    @Test
+    fun testSingleOrNullPredicate() {
+        checkTerminal {
+            assertEquals(7, singleOrNull { it % 7 == 0 })
+        }
+        checkTerminal {
+            assertEquals(null, singleOrNull { it % 3 == 0 })
+        }
+        checkTerminal {
+            assertEquals(null, singleOrNull { it > 10 })
+        }
+    }
+
+    @Test
+    fun testDrop() {
+        checkTransform(sourceList.drop(3)) { ctx ->
+            drop(3, ctx)
+        }
+    }
+
+    @Test
+    fun testDropWhile() {
+        checkTransform(sourceList.dropWhile { it < 4}) { ctx ->
+            dropWhile(ctx) { it < 4 }
+        }
+    }
+
+    @Test
+    fun testFilter() {
+        checkTransform(sourceList.filter { it % 2 == 0 }) { ctx ->
+            filter(ctx) { it % 2 == 0 }
+        }
+    }
+
+    @Test
+    fun testFilterIndexed() {
+        checkTransform(sourceList.filterIndexed { index, _ -> index % 2 == 0 }) { ctx ->
+            filterIndexed(ctx) { index, _ -> index % 2 == 0 }
+        }
+    }
+
+    @Test
+    fun testFilterIndexedToCollection() {
+        checkTerminal {
+            val list = mutableListOf<Int>()
+            filterIndexedTo(list) { index, _ -> index % 2 == 0 }
+            assertEquals(listOf(1, 3, 5, 7, 9), list)
+        }
+    }
+
+    @Test
+    fun testFilterIndexedToChannel() {
+        checkTerminal {
+            val channel = Channel<Int>()
+            val result = async { channel.toList() }
+            filterIndexedTo(channel) { index, _ -> index % 2 == 0 }
+            channel.close()
+            assertEquals(listOf(1, 3, 5, 7, 9), result.await())
+        }
+    }
+
+    @Test
+    fun testFilterNot() {
+        checkTransform(sourceList.filterNot { it % 2 == 0 }) { ctx ->
+            filterNot(ctx) { it % 2 == 0 }
+        }
+    }
+
+    @Test
+    fun testFilterNotNullToCollection() {
+        checkTerminal {
+            val list = mutableListOf<Int>()
+            filterNotNullTo(list)
+            assertEquals((1..10).toList(), list)
+        }
+    }
+
+    @Test
+    fun testFilterNotNullToChannel() {
+        checkTerminal {
+            val channel = Channel<Int>()
+            val result = async { channel.toList() }
+            filterNotNullTo(channel)
+            channel.close()
+            assertEquals((1..10).toList(), result.await())
+        }
+    }
+
+    @Test
+    fun testFilterNotToCollection() {
+        checkTerminal {
+            val list = mutableListOf<Int>()
+            filterNotTo(list) { it % 2 == 0 }
+            assertEquals(listOf(1, 3, 5, 7, 9), list)
+        }
+    }
+
+    @Test
+    fun testFilterNotToChannel() {
+        checkTerminal {
+            val channel = Channel<Int>()
+            val result = async { channel.toList() }
+            filterNotTo(channel) { it % 2 == 0 }
+            channel.close()
+            assertEquals(listOf(1, 3, 5, 7, 9), result.await())
+        }
+    }
+
+    @Test
+    fun testFilterToCollection() {
+        checkTerminal {
+            val list = mutableListOf<Int>()
+            filterTo(list) { it % 2 == 0 }
+            assertEquals(listOf(2, 4, 6, 8, 10), list)
+        }
+    }
+
+    @Test
+    fun testFilterToChannel() {
+        checkTerminal {
+            val channel = Channel<Int>()
+            val result = async { channel.toList() }
+            filterTo(channel) { it % 2 == 0 }
+            channel.close()
+            assertEquals(listOf(2, 4, 6, 8, 10), result.await())
+        }
+    }
+
+    @Test
+    fun testTake() {
+        checkTransform(sourceList.take(3)) { ctx ->
+            take(3, ctx)
+        }
+    }
+
+    @Test
+    fun testTakeWhile() {
+        checkTransform(sourceList.takeWhile { it < 4 }) { ctx ->
+            takeWhile(ctx) { it < 4 }
+        }
+    }
+
+    @Test
+    fun testAssociate() {
+        checkTerminal {
+            assertEquals(sourceList.associate { it to it.toString() }, associate { it to it.toString() })
+        }
+    }
+
+    @Test
+    fun testAssociateBy() {
+        checkTerminal {
+            assertEquals(sourceList.associateBy { it.toString() }, associateBy { it.toString() })
+        }
+    }
+
+    @Test
+    fun testAssociateByTwo() {
+        checkTerminal {
+            assertEquals(sourceList.associateBy({ it.toString() }, { it + 1}), associateBy({ it.toString() }, { it + 1}))
+        }
+    }
+
+    @Test
+    fun testAssociateByToMap() {
+        checkTerminal {
+            val map = mutableMapOf<String, Int>()
+            associateByTo(map) { it.toString() }
+            assertEquals(sourceList.associateBy { it.toString() }, map)
+        }
+    }
+
+    @Test
+    fun testAssociateByTwoToMap() {
+        checkTerminal {
+            val map = mutableMapOf<String, Int>()
+            associateByTo(map, { it.toString() }, { it + 1})
+            assertEquals(sourceList.associateBy({ it.toString() }, { it + 1}), map)
+        }
+    }
+
+    @Test
+    fun testAssociateToMap() {
+        checkTerminal {
+            val map = mutableMapOf<Int, String>()
+            associateTo(map) { it to it.toString() }
+            assertEquals(sourceList.associate { it to it.toString() }, map)
+        }
+    }
+
+    @Test
+    fun testToChannel() {
+        checkTerminal {
+            val channel = Channel<Int>()
+            val result = async { channel.toList() }
+            toChannel(channel)
+            channel.close()
+            assertEquals(sourceList, result.await())
+        }
+    }
+
+    @Test
+    fun testToCollection() {
+        checkTerminal {
+            val list = mutableListOf<Int>()
+            toCollection(list)
+            assertEquals(sourceList, list)
+        }
+    }
+
+    @Test
+    fun testToList() {
+        checkTerminal {
+            val list = toList()
+            assertEquals(sourceList, list)
+        }
+    }
+
+    @Test
+    fun testToMap() {
+        checkTerminal {
+            val map = map { it to it.toString() }.toMap()
+            assertEquals(sourceList.map { it to it.toString() }.toMap(), map)
+        }
+    }
+
+    @Test
+    fun testToMapWithMap() {
+        checkTerminal {
+            val map = mutableMapOf<Int, String>()
+            map { it to it.toString() }.toMap(map)
+            assertEquals(sourceList.map { it to it.toString() }.toMap(), map)
+        }
+    }
+
+    @Test
+    fun testToMutableList() {
+        checkTerminal {
+            val list = toMutableList()
+            assertEquals(sourceList, list)
+        }
+    }
+
+    @Test
+    fun testToSet() {
+        checkTerminal {
+            val set = toSet()
+            assertEquals(sourceList.toSet(), set)
+        }
+    }
+
+    @Test
+    fun testFlatMap() {
+        checkTransform(sourceList.flatMap { listOf("A$it", "B$it") }) { ctx ->
+            flatMap(ctx) {
+                produce {
+                    send("A$it")
+                    send("B$it")
+                }
+            }
+        }
+    }
+
+    @Test
+    fun testGroupBy() {
+        checkTerminal {
+            val map = groupBy { it % 2 }
+            assertEquals(sourceList.groupBy { it % 2 }, map)
+        }
+    }
+
+    @Test
+    fun testGroupByTwo() {
+        checkTerminal {
+            val map = groupBy({ it % 2 }, { it.toString() })
+            assertEquals(sourceList.groupBy({ it % 2 }, { it.toString() }), map)
+        }
+    }
+
+    @Test
+    fun testGroupByTo() {
+        checkTerminal {
+            val map = mutableMapOf<Int, MutableList<Int>>()
+            groupByTo(map) { it % 2 }
+            assertEquals(sourceList.groupBy { it % 2 }, map)
+        }
+    }
+
+    @Test
+    fun testGroupByToTwo() {
+        checkTerminal {
+            val map = mutableMapOf<Int, MutableList<String>>()
+            groupByTo(map, { it % 2 }, { it.toString() })
+            assertEquals(sourceList.groupBy({ it % 2 }, { it.toString() }), map)
+        }
+    }
+
+    @Test
+    fun testMap() {
+        checkTransform(sourceList.map { it.toString() }) { ctx ->
+            map(ctx) { it.toString() }
+        }
+    }
+
+    @Test
+    fun testMapIndexed() {
+        checkTransform(sourceList.mapIndexed { index, v -> "$index$v" }) { ctx ->
+            mapIndexed(ctx) { index, v -> "$index$v" }
+        }
+    }
+
+    @Test
+    fun testMapIndexedNotNull() {
+        checkTransform(sourceList.mapIndexedNotNull { index, v -> "$index$v".takeIf { v % 2 == 0 } }) { ctx ->
+            mapIndexedNotNull(ctx) { index, v -> "$index$v".takeIf { v % 2 == 0 } }
+        }
+    }
+
+    @Test
+    fun testMapIndexedNotNullToCollection() {
+        checkTerminal {
+            val list = mutableListOf<String>()
+            mapIndexedNotNullTo(list) { index, v -> "$index$v".takeIf { v % 2 == 0 } }
+            assertEquals(sourceList.mapIndexedNotNull { index, v -> "$index$v".takeIf { v % 2 == 0 } }, list)
+        }
+    }
+
+    @Test
+    fun testMapIndexedNotNullToChannel() {
+        checkTerminal {
+            val channel = Channel<String>()
+            val result = async { channel.toList() }
+            mapIndexedNotNullTo(channel) { index, v -> "$index$v".takeIf { v % 2 == 0 } }
+            channel.close()
+            assertEquals(sourceList.mapIndexedNotNull { index, v -> "$index$v".takeIf { v % 2 == 0 } }, result.await())
+        }
+    }
+
+    @Test
+    fun testMapIndexedToCollection() {
+        checkTerminal {
+            val list = mutableListOf<String>()
+            mapIndexedTo(list) { index, v -> "$index$v" }
+            assertEquals(sourceList.mapIndexed { index, v -> "$index$v" }, list)
+        }
+    }
+
+    @Test
+    fun testMapIndexedToChannel() {
+        checkTerminal {
+            val channel = Channel<String>()
+            val result = async { channel.toList() }
+            mapIndexedTo(channel) { index, v -> "$index$v" }
+            channel.close()
+            assertEquals(sourceList.mapIndexed { index, v -> "$index$v" }, result.await())
+        }
+    }
+
+    @Test
+    fun testMapNotNull() {
+        checkTransform(sourceList.mapNotNull { (it + 3).takeIf { it % 2 == 0 } }) { ctx ->
+            mapNotNull(ctx) { (it + 3).takeIf { it % 2 == 0 } }
+        }
+    }
+
+    @Test
+    fun testMapNotNullToCollection() {
+        checkTerminal {
+            val list = mutableListOf<Int>()
+            mapNotNullTo(list) { (it + 3).takeIf { it % 2 == 0 } }
+            assertEquals(sourceList.mapNotNull { (it + 3).takeIf { it % 2 == 0 } }, list)
+        }
+    }
+
+    @Test
+    fun testMapNotNullToChannel() {
+        checkTerminal {
+            val channel = Channel<Int>()
+            val result = async { channel.toList() }
+            mapNotNullTo(channel) { (it + 3).takeIf { it % 2 == 0 } }
+            channel.close()
+            assertEquals(sourceList.mapNotNull { (it + 3).takeIf { it % 2 == 0 } }, result.await())
+        }
+    }
+
+    @Test
+    fun testMapToCollection() {
+        checkTerminal {
+            val list = mutableListOf<Int>()
+            mapTo(list) { it + 3 }
+            assertEquals(sourceList.map { it + 3 }, list)
+        }
+    }
+
+    @Test
+    fun testMapToChannel() {
+        checkTerminal {
+            val channel = Channel<Int>()
+            val result = async { channel.toList() }
+            mapTo(channel) { it + 3 }
+            channel.close()
+            assertEquals(sourceList.map { it + 3 }, result.await())
+        }
+    }
+
+    @Test
+    fun testWithIndex() {
+        checkTransform(sourceList.withIndex().toList()) { ctx ->
+            withIndex(ctx)
+        }
+    }
+
+    @Test
+    fun testDistinctBy() {
+        checkTransform(sourceList.distinctBy { it / 2 }) { ctx ->
+            distinctBy(ctx) { it / 2 }
+        }
+    }
+
+    @Test
+    fun testToMutableSet() {
+        checkTerminal {
+            val set = toMutableSet()
+            assertEquals(sourceList.toSet(), set)
+        }
+    }
+
+    @Test
+    fun testAll() {
+        checkTerminal {
+            val all = all { it < 11 }
+            assertEquals(sourceList.all { it < 11 }, all)
+        }
+    }
+
+    @Test
+    fun testAny() {
+        checkTerminal {
+            val any = any()
+            assertEquals(sourceList.any(), any)
+        }
+    }
+
+    @Test
+    fun testAnyPredicate() {
+        checkTerminal {
+            val any = any { it % 3 == 0 }
+            assertEquals(sourceList.any { it % 3 == 0 }, any)
+        }
+    }
+    
+    @Test
+    fun testCount() {
+        checkTerminal {
+            val c = count()
+            assertEquals(sourceList.count(), c)
+        }
+    }
+
+    @Test
+    fun testCountPredicate() {
+        checkTerminal {
+            val c = count { it % 3 == 0 }
+            assertEquals(sourceList.count { it % 3 == 0 }, c)
+        }
+    }
+
+    @Test
+    fun testFold() {
+        checkTerminal {
+            val c = fold(1) { a, b -> a + b }
+            assertEquals(sourceList.fold(1) { a, b -> a + b }, c)
+        }
+    }
+
+    @Test
+    fun testFoldIndexed() {
+        checkTerminal {
+            val c = foldIndexed(1) { i, a, b -> i * a + b }
+            assertEquals(sourceList.foldIndexed(1) { i, a, b -> i * a + b }, c)
+        }
+    }
+
+    @Test
+    fun testMaxBy() {
+        checkTerminal {
+            val c = maxBy { it % 3 }
+            assertEquals(sourceList.maxBy { it % 3 }, c)
+        }
+    }
+
+    @Test
+    fun testMaxWith() {
+        checkTerminal {
+            val c = maxWith(compareBy { it % 3 })
+            assertEquals(sourceList.maxWith(compareBy { it % 3 }), c)
+        }
+    }
+
+    @Test
+    fun testMinBy() {
+        checkTerminal {
+            val c = maxBy { it % 3 }
+            assertEquals(sourceList.maxBy { it % 3 }, c)
+        }
+    }
+
+    @Test
+    fun testMinWith() {
+        checkTerminal {
+            val c = maxWith(compareBy { it % 3 })
+            assertEquals(sourceList.maxWith(compareBy { it % 3 }), c)
+        }
+    }
+
+    @Test
+    fun testNone() {
+        checkTerminal {
+            val none = none()
+            assertEquals(sourceList.none(), none)
+        }
+    }
+
+    @Test
+    fun testNonePredicate() {
+        checkTerminal {
+            val none = none { it > 10 }
+            assertEquals(sourceList.none { it > 10 }, none)
+        }
+    }
+
+    @Test
+    fun testReduce() {
+        checkTerminal {
+            val c = reduce { a, b -> a + b }
+            assertEquals(sourceList.reduce { a, b -> a + b }, c)
+        }
+    }
+
+    @Test
+    fun testReduceIndexed() {
+        checkTerminal {
+            val c = reduceIndexed { i, a, b -> i * a + b }
+            assertEquals(sourceList.reduceIndexed { i, a, b -> i * a + b }, c)
+        }
+    }
+
+    @Test
+    fun testSubBy() {
+        checkTerminal {
+            val c = sumBy { it }
+            assertEquals(sourceList.sumBy { it }, c)
+        }
+    }
+
+    @Test
+    fun testSubByDouble() {
+        checkTerminal {
+            val c = sumByDouble { it.toDouble() }
+            assertEquals(sourceList.sumByDouble { it.toDouble() }, c)
+        }
+    }
+
+    @Test
+    fun testPartition() {
+        checkTerminal {
+            val pair = partition { it % 2 == 0 }
+            assertEquals(sourceList.partition { it % 2 == 0 }, pair)
+        }
+    }
+
+    @Test
+    fun testZip() {
+        val expect = sourceList.zip(sourceList) { a, b -> a + 2 * b }
+        checkTransform(expect) { ctx ->
+            zip(testSource(), ctx) { a, b -> a + 2*b }
+        }
+        checkTransform(expect) { ctx ->
+            testSource().zip(this, ctx) { a, b -> a + 2*b }
+        }
+    }
+
+    // ------------------
+    
+    private fun checkTerminal(
+        expected: ((Throwable?) -> Unit)? = null,
+        terminal: suspend ReceiveChannel<Int>.() -> Unit
+    ) {
+        checkTerminalCompletion(expected, terminal)
+        checkTerminalCancellation(expected, terminal)
+    }
+
+    private fun checkTerminalCompletion(
+        expected: ((Throwable?) -> Unit)? = null,
+        terminal: suspend ReceiveChannel<Int>.() -> Unit
+    ) {
+        val src = testSource()
+        runBlocking {
+            try {
+                // terminal operation
+                terminal(src)
+                // source must be cancelled at the end of terminal op
+                assertTrue(src.isClosedForReceive, "Source must be closed")
+                if (expected != null) error("Exception was expected")
+            } catch (e: Throwable) {
+                if (expected == null) throw e
+                expected(e)
+            }
+        }
+    }
+
+    private fun checkTerminalCancellation(
+        expected: ((Throwable?) -> Unit)? = null,
+        terminal: suspend ReceiveChannel<Int>.() -> Unit
+    ) {
+        val src = testSource()
+        runBlocking {
+            // terminal operation in a separate async context started until the first suspension
+            val d = async(coroutineContext, start = CoroutineStart.UNDISPATCHED) {
+                terminal(src)
+            }
+            // then cancel it
+            d.cancel()
+            // and try to get it's result
+            try {
+                d.await()
+            } catch (e: CancellationException) {
+                // ok -- was cancelled
+            } catch (e: Throwable) {
+                // if threw a different exception -- must be an expected one
+                if (expected == null) throw e
+                expected(e)
+            }
+        }
+        // source must be cancelled at the end of terminal op even if it was cancelled while in process
+        assertTrue(src.isClosedForReceive, "Source must be closed")
+    }
+
+    private fun <R> checkTransform(
+        expect: List<R>,
+        transform: ReceiveChannel<Int>.(CoroutineContext) -> ReceiveChannel<R>
+    ) {
+        // check for varying number of received elements from the channel
+        for (nReceive in 0..expect.size) {
+            checkTransform(nReceive, expect, transform)
+        }
+    }
+
+    private fun <R> checkTransform(
+        nReceive: Int,
+        expect: List<R>,
+        transform: ReceiveChannel<Int>.(CoroutineContext) -> ReceiveChannel<R>
+    ) {
+        val src = testSource()
+        runBlocking {
+            // transform
+            val res = transform(src, coroutineContext)
+            // receive nReceive elements from the result
+            repeat(nReceive) { i ->
+                assertEquals(expect[i], res.receive())
+            }
+            if (nReceive < expect.size) {
+                // then cancel
+                res.cancel()
+            } else {
+                // then check that result is closed
+                assertEquals(null, res.receiveOrNull(), "Result has unexpected values")
+            }
+        }
+        // source must be cancelled when runBlocking processes all the scheduled stuff
+        assertTrue(src.isClosedForReceive, "Source must be closed")
+    }
+}
\ No newline at end of file