Introduced `consumeEach` for channels and reactive streams, deprecated iteration on reactive streams
diff --git a/reactive/coroutines-guide-reactive.md b/reactive/coroutines-guide-reactive.md
index 0ebb6ca..18005cb 100644
--- a/reactive/coroutines-guide-reactive.md
+++ b/reactive/coroutines-guide-reactive.md
@@ -103,25 +103,23 @@
 
 ```kotlin
 fun main(args: Array<String>) = runBlocking<Unit> {
-    // create a channel that produces numbers from 1 to 6 with 200ms delays between them
+    // create a channel that produces numbers from 1 to 3 with 200ms delays between them
     val source = produce<Int>(context) {
         println("Begin") // mark the beginning of this coroutine in output
-        for (x in 1..6) {
+        for (x in 1..3) {
             delay(200) // wait for 200ms
             send(x) // send number x to the channel
         }
     }
-    // print the first 3 elements from this channel
-    println("First three:")
-    var cnt = 0
-    for (x in source) { // iterate over the source to receive elements from it
-        println(x)
-        if (++cnt >= 3) break // break when 3 elements are printed
+    // print elements from the source
+    println("Elements:")
+    source.consumeEach { // consume elements from it
+        println(it)
     }
-    // print the remaining elements from this source
-    println("Remaining:")
-    for (x in source) { 
-        println(x)
+    // print elements from the source AGAIN
+    println("Again:")
+    source.consumeEach { // consume elements from it
+        println(it)
     }
 }
 ```
@@ -131,21 +129,22 @@
 This code produces the following output: 
 
 ```text
-First three:
+Elements:
 Begin
 1
 2
 3
-Remaining:
-4
-5
-6
+Again:
 ```
 
 <!--- TEST -->
 
-Notice, how "Begin" line was printed just once, because [publish] _coroutine builder_, when it is executed,
-launches one coroutine to produce a stream of elements.
+Notice, how "Begin" line was printed just once, because [produce] _coroutine builder_, when it is executed,
+launches one coroutine to produce a stream of elements. All the produced elements are consumed 
+with [ReceiveChannel.consumeEach][consumeEach] 
+extension function. There is no way to receive the elements from this
+channel again. The channel is closed when the producer coroutine is over and the attempt to receive 
+from it again cannot receive anything.
 
 Let us rewrite this code using [publish] coroutine builder from `kotlinx-coroutines-reactive` module
 instead of [produce] from `kotlinx-coroutines-core` module. The code stays the same, 
@@ -160,26 +159,24 @@
 
 ```kotlin
 fun main(args: Array<String>) = runBlocking<Unit> {
-    // create a channel that produces numbers from 1 to 6 with 200ms delays between them
+    // create a publisher that produces numbers from 1 to 3 with 200ms delays between them
     val source = publish<Int>(context) {  
     //           ^^^^^^^  <---  Difference from the previous examples is here
         println("Begin") // mark the beginning of this coroutine in output
-        for (x in 1..6) {
+        for (x in 1..3) {
             delay(200) // wait for 200ms
             send(x) // send number x to the channel
         }
     }
-    // print the first 3 elements from this channel
-    println("First three:")
-    var cnt = 0
-    for (x in source) { // iterate over the source to receive elements from it
-        println(x)
-        if (++cnt >= 3) break // break when 3 elements are printed
+    // print elements from the source
+    println("Elements:")
+    source.consumeEach { // consume elements from it
+        println(it)
     }
-    // print the remaining elements from this source
-    println("Remaining:")
-    for (x in source) { 
-        println(x)
+    // print elements from the source AGAIN
+    println("Again:")
+    source.consumeEach { // consume elements from it
+        println(it)
     }
 }
 ```
@@ -189,19 +186,16 @@
 Now the output of this code changes to:
 
 ```text
-First three:
+Elements:
 Begin
 1
 2
 3
-Remaining:
+Again:
 Begin
 1
 2
 3
-4
-5
-6
 ```
 
 <!--- TEST -->
@@ -212,44 +206,11 @@
 a different stream of elements, depending on how the corresponding implementation of `Publisher` works.
 
 The [publish] coroutine builder, that is used in the above example, launches a fresh coroutine on each subscription.
-An iteration over an instance of `Publisher` with `for (x in source)` statement 
-opens the channel to this publisher, creating a fresh subscription.
-We have two `for (x in source)` statements in this code and that is why we see "Begin" printed twice. 
+Every [Publisher.consumeEach][org.reactivestreams.Publisher.consumeEach] invocation creates a fresh subscription.
+We have two of them in this code and that is why we see "Begin" printed twice. 
 
 In Rx lingo this is called a _cold_ publisher. Many standard Rx operators produce cold streams, too. We can iterate
-over them from a coroutine, and every iteration produces the same stream of elements as the following
-example shows:
-
-<!--- INCLUDE
-import io.reactivex.*
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
--->
-
-```kotlin
-fun main(args: Array<String>) = runBlocking<Unit> {
-    val source = Flowable.range(1, 3) // Rx 2.x operator to produce a range of integers
-    // iterate over the source once
-    for (x in source) println("First pass $x")
-    // iterate over the source again
-    for (x in source) println("Second pass $x")
-}
-```
-
-> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-03.kt)
-
-The output is:
-
-```text
-First pass 1
-First pass 2
-First pass 3
-Second pass 1
-Second pass 2
-Second pass 3
-```
-
-<!--- TEST -->
+over them from a coroutine, and every subscription produces the same stream of elements.
 
 > Note, that we can replicate the same behaviour that we saw with channels by using Rx 
 [publish](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#publish()) 
@@ -258,22 +219,10 @@
 
 ### Subscription and cancellation
 
-An example in the previous section contains this snippet:
-
-```kotlin
-var cnt = 0
-for (x in source) { // iterate over the source to receive elements from it
-    println(x)
-    if (++cnt >= 3) break // break when 3 elements are printed
-}
-```
-
-<!--- CLEAR -->
-
-It is perfectly fine code for receiving the first three elements from a channel, but it should not be generally used
-with a `Publisher` or with a similar reactive stream. When `source` is a `Publisher`, the
-`for (x in source)` statement is a shortcut for `for (x in source.open())`. Let us take a closer look at 
-what [Publisher.open][org.reactivestreams.Publisher.open] does and how it can be used:
+An example in the previous section uses `source.consumeEach { ... }` snippet to open a subscription 
+and receive all the elements from it. If we need more control on how what to do with 
+the elements that are being received from the channel, we can use [Publisher.open][org.reactivestreams.Publisher.open]
+as shown in the following example:
 
 <!--- INCLUDE
 import io.reactivex.*
@@ -287,16 +236,17 @@
         .doOnSubscribe { println("OnSubscribe") } // provide some insight
         .doFinally { println("Finally") }         // ... into what's going on
     var cnt = 0 
-    val channel = source.open() // open channel to the source
-    for (x in channel) { // iterate over the channel to receive elements from it
-        println(x)
-        if (++cnt >= 3) break // break when 3 elements are printed
+    source.open().use { channel -> // open channel to the source
+        for (x in channel) { // iterate over the channel to receive elements from it
+            println(x)
+            if (++cnt >= 3) break // break when 3 elements are printed
+        }
+        // `use` will close the channel when this block of code is complete
     }
-    channel.close() // close the channel
 }
 ```
 
-> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-04.kt)
+> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-03.kt)
 
 It produces the following output:
  
@@ -310,13 +260,16 @@
 
 <!--- TEST -->
  
-Using an explicit `open` we can [close][SubscriptionReceiveChannel.close] the corresponding 
-subscription and unsubscribe from the source. The installed 
+With an explicit `open` we should [close][SubscriptionReceiveChannel.close] the corresponding 
+subscription to unsubscribe from the source. However, instead of invoking `close` explicitly, 
+this code relies on [use](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.io/use.html)
+function from Kotlin's standard library.
+The installed 
 [doFinally](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#doFinally(io.reactivex.functions.Action))
-listener prints "Finally" to confirm that this is actually what is happening.
+listener prints "Finally" to confirm that the subscription is actually being closed.
  
 We do not need to use an explicit `close` if iteration is performed over all the items that are emitted 
-by the publisher:
+by the publisher, because it is being closed automatically by `consumeEach`:
 
 <!--- INCLUDE
 import io.reactivex.*
@@ -330,11 +283,11 @@
         .doOnSubscribe { println("OnSubscribe") } // provide some insight
         .doFinally { println("Finally") }         // ... into what's going on
     // iterate over the source fully
-    for (x in source) println(x)
+    source.consumeEach { println(it) }
 }
 ```
 
-> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-05.kt)
+> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-04.kt)
 
 We get the following output:
 
@@ -352,7 +305,7 @@
 
 Notice, how "Finally" is printed before the last element "5". It happens because our `main` function in this
 example is a coroutine that we start with [runBlocking] coroutine builder.
-Our main coroutine receives on the channel using `for (x in source)` statement.
+Our main coroutine receives on the channel using `source.consumeEach { ... }` expression.
 The main coroutine is _suspended_ while it waits for the source to emit an item.
 When the last item is emitted by `Flowable.range(1, 5)` it
 _resumes_ the main coroutine, which gets dispatched onto the main thread to print this
@@ -403,7 +356,7 @@
 }
 ```
 
-> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-06.kt)
+> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-05.kt)
 
 The output of this code nicely illustrates how backpressure works with coroutines:
 
@@ -470,7 +423,7 @@
 
 ```kotlin
 fun main(args: Array<String>) = runBlocking<Unit> {
-    for (x in range(CommonPool, 1, 5)) println(x)
+    range(CommonPool, 1, 5).consumeEach { println(it) }
 }
 ```
 
@@ -505,13 +458,14 @@
 
 ```kotlin
 fun <T, R> Publisher<T>.fusedFilterMap(
-    context: CoroutineContext,      // the context to execute this coroutine in
-    predicate: (T) -> Boolean,      // the filter predicate
-    mapper: (T) -> R                // the mapper function
+    context: CoroutineContext,   // the context to execute this coroutine in
+    predicate: (T) -> Boolean,   // the filter predicate
+    mapper: (T) -> R             // the mapper function
 ) = publish<R>(context) {
-    for (x in this@fusedFilterMap)  // iterate of the source stream 
-        if (predicate(x))           // filter part
-            send(mapper(x))         // map part
+    consumeEach {                // consume the source stream 
+        if (predicate(it))       // filter part
+            send(mapper(it))     // map part
+    }        
 }
 ```
 
@@ -527,9 +481,9 @@
 
 ```kotlin
 fun main(args: Array<String>) = runBlocking<Unit> {
-   val result = range(context, 1, 5)
+   range(context, 1, 5)
        .fusedFilterMap(context, { it % 2 == 0}, { "$it is even" })
-   for (x in result) println(x) // print all strings from result
+       .consumeEach { println(it) } // print all the resulting strings
 }
 ```
 
@@ -563,10 +517,10 @@
 
 ```kotlin
 fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = publish<T>(context) {
-    this@takeUntil.open().use { thisChannel -> // open channel to Publisher<T>
-        other.open().use { otherChannel ->     // open channel to Publisher<U>
+    this@takeUntil.open().use { thisChannel ->           // explicitly open channel to Publisher<T>
+        other.open().use { otherChannel ->               // explicitly open channel to Publisher<U>
             whileSelect {
-                otherChannel.onReceive { false } // bail out on any received element from `other`
+                otherChannel.onReceive { false }         // bail out on any received element from `other`
                 thisChannel.onReceive { send(it); true } // resend element from this channel and continue
             }
         }
@@ -597,9 +551,9 @@
 
 ```kotlin
 fun main(args: Array<String>) = runBlocking<Unit> {
-    val slowNums = rangeWithInterval(context, 200, 1, 10) // numbers with 200ms interval
-    val stop = rangeWithInterval(context, 500, 1, 10) // the first one after 500ms
-    for (x in slowNums.takeUntil(context, stop)) println(x) // let's test it
+    val slowNums = rangeWithInterval(context, 200, 1, 10)         // numbers with 200ms interval
+    val stop = rangeWithInterval(context, 500, 1, 10)             // the first one after 500ms
+    slowNums.takeUntil(context, stop).consumeEach { println(it) } // let's test it
 }
 ```
 
@@ -631,9 +585,9 @@
 
 ```kotlin
 fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = publish<T>(context) {
-  for (pub in this@merge) {      // for each publisher received on the source channel
-      launch(this.context) {     // launch a child coroutine
-          for (x in pub) send(x) // resend all element from this publisher
+  consumeEach { pub ->                 // for each publisher received on the source channel
+      launch(this.context) {           // launch a child coroutine
+          pub.consumeEach { send(it) } // resend all element from this publisher
       }
   }
 }
@@ -671,7 +625,7 @@
 
 ```kotlin
 fun main(args: Array<String>) = runBlocking<Unit> {
-    for (x in testPub(context).merge(context)) println(x) // print the whole stream
+    testPub(context).merge(context).consumeEach { println(it) } // print the whole stream
 }
 ```
 
@@ -718,9 +672,8 @@
         BiFunction { x, _ -> x })
 
 fun main(args: Array<String>) {
-    rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3).subscribe { x ->
-        println("$x on thread ${Thread.currentThread().name}")
-    }
+    rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
+        .subscribe { println("$it on thread ${Thread.currentThread().name}") }
     Thread.sleep(1000)
 }
 ```
@@ -762,9 +715,7 @@
 
 fun main(args: Array<String>) {
     Flowable.fromPublisher(rangeWithInterval(CommonPool, 100, 1, 3))
-        .subscribe { x ->
-            println("$x on thread ${Thread.currentThread().name}")
-        }
+        .subscribe { println("$it on thread ${Thread.currentThread().name}") }
     Thread.sleep(1000)
 }
 ```
@@ -815,9 +766,7 @@
 fun main(args: Array<String>) {
     Flowable.fromPublisher(rangeWithInterval(CommonPool, 100, 1, 3))
         .observeOn(Schedulers.computation())                           // <-- THIS LINE IS ADDED
-        .subscribe { x ->
-            println("$x on thread ${Thread.currentThread().name}")
-        }
+        .subscribe { println("$it on thread ${Thread.currentThread().name}") }
     Thread.sleep(1000)
 }
 ```
@@ -857,8 +806,8 @@
         BiFunction { x, _ -> x })
 
 fun main(args: Array<String>) = runBlocking<Unit> {
-    for (x in rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3))
-        println("$x on thread ${Thread.currentThread().name}")
+    rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
+        .consumeEach { println("$it on thread ${Thread.currentThread().name}") }
 }
 ```
 
@@ -903,8 +852,8 @@
 
 fun main(args: Array<String>) = runBlocking<Unit> {
     val job = launch(Unconfined) { // launch new coroutine in Unconfined context (without its own thread pool)
-        for (x in rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3))
-            println("$x on thread ${Thread.currentThread().name}")
+        rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
+            .consumeEach { println("$it on thread ${Thread.currentThread().name}") }
     }
     job.join() // wait for our coroutine to complete
 }
@@ -925,7 +874,7 @@
 
 Note, that [Unconfined] context shall be used with care. It may improve the overall performance on certain tests,
 due to the increased stack-locality of operations and less scheduling overhead, but it also produces deeper stacks 
-and makes it harder to reason about asynchonity of the code that is using it. 
+and makes it harder to reason about asynchronicity of the code that is using it. 
 
 If a coroutine sends an element to a channel, then the thread that invoked the 
 [send][SendChannel.send] may start executing the code of a coroutine with [Unconfined] dispatcher.
@@ -949,6 +898,7 @@
 [Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/index.html
 [ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/receive.html
 [produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
+[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/consume-each.html
 [ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/index.html
 [SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/send.html
 <!--- INDEX kotlinx.coroutines.experimental.selects -->
@@ -958,6 +908,7 @@
 <!--- DOCS_ROOT reactive/kotlinx-coroutines-reactive/target/dokka/kotlinx-coroutines-reactive -->
 <!--- INDEX kotlinx.coroutines.experimental.reactive -->
 [publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/publish.html
+[org.reactivestreams.Publisher.consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/consume-each.html
 [org.reactivestreams.Publisher.open]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/open.html
 [SubscriptionReceiveChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/-subscription-receive-channel/close.html
 <!--- SITE_ROOT https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2 -->
diff --git a/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Channel.kt b/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Channel.kt
index c4dba88..ad5a0a2 100644
--- a/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Channel.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Channel.kt
@@ -50,8 +50,24 @@
  * This is a shortcut for `open().iterator()`. See [open] if you need an ability to manually
  * unsubscribe from the observable.
  */
+
+@Suppress("DeprecatedCallableAddReplaceWith")
+@Deprecated(message =
+    "This iteration operator for `for (x in source) { ... }` loop is deprecated, " +
+    "because it leaves code vulnerable to leaving unclosed subscriptions on exception. " +
+    "Use `source.consumeEach { x -> ... }`.")
 public operator fun <T> Publisher<T>.iterator() = open().iterator()
 
+/**
+ * Subscribes to this [Publisher] and performs the specified action for each received element.
+ */
+// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
+public suspend fun <T> Publisher<T>.consumeEach(action: suspend (T) -> Unit) {
+    open().use { channel ->
+        for (x in channel) action(x)
+    }
+}
+
 private class SubscriptionChannel<T> : LinkedListChannel<T>(), SubscriptionReceiveChannel<T>, Subscriber<T> {
     @Volatile
     @JvmField
diff --git a/reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/IntegrationTest.kt b/reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/IntegrationTest.kt
index c5d8ca8..f80fe1c 100644
--- a/reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/IntegrationTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/IntegrationTest.kt
@@ -61,9 +61,7 @@
         assertNSE { pub.awaitLast() }
         assertNSE { pub.awaitSingle() }
         var cnt = 0
-        for (t in pub) {
-            cnt++
-        }
+        pub.consumeEach { cnt++ }
         assertThat(cnt, IsEqual(0))
     }
 
@@ -78,8 +76,8 @@
         assertThat(pub.awaitLast(), IsEqual("OK"))
         assertThat(pub.awaitSingle(), IsEqual("OK"))
         var cnt = 0
-        for (t in pub) {
-            assertThat(t, IsEqual("OK"))
+        pub.consumeEach {
+            assertThat(it, IsEqual("OK"))
             cnt++
         }
         assertThat(cnt, IsEqual(1))
@@ -106,8 +104,8 @@
 
     private suspend fun checkNumbers(n: Int, pub: Publisher<Int>) {
         var last = 0
-        for (t in pub) {
-            assertThat(t, IsEqual(++last))
+        pub.consumeEach {
+            assertThat(it, IsEqual(++last))
         }
         assertThat(last, IsEqual(n))
     }
diff --git a/reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/PublisherCompletionStressTest.kt b/reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/PublisherCompletionStressTest.kt
index a5948b9..a1f5fa1 100644
--- a/reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/PublisherCompletionStressTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/PublisherCompletionStressTest.kt
@@ -39,7 +39,7 @@
             runBlocking {
                 withTimeout(5000) {
                     var received = 0
-                    for (x in range(CommonPool, 1, count)) {
+                    range(CommonPool, 1, count).consumeEach { x ->
                         received++
                         if (x != received) error("$x != $received")
                     }
diff --git a/reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/PublisherMultiTest.kt b/reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/PublisherMultiTest.kt
index d3aa13b..954aae1 100644
--- a/reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/PublisherMultiTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/PublisherMultiTest.kt
@@ -43,8 +43,8 @@
             jobs.forEach { it.join() }
         }
         val resultSet = mutableSetOf<Int>()
-        for (t in observable) {
-            assertTrue(resultSet.add(t))
+        observable.consumeEach {
+            assertTrue(resultSet.add(it))
         }
         assertThat(resultSet.size, IsEqual(n))
     }
diff --git a/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxChannel.kt b/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxChannel.kt
index 6921be3..06bd13e 100644
--- a/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxChannel.kt
+++ b/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxChannel.kt
@@ -53,8 +53,23 @@
  * This is a shortcut for `open().iterator()`. See [open] if you need an ability to manually
  * unsubscribe from the observable.
  */
+@Suppress("DeprecatedCallableAddReplaceWith")
+@Deprecated(message =
+"This iteration operator for `for (x in source) { ... }` loop is deprecated, " +
+    "because it leaves code vulnerable to leaving unclosed subscriptions on exception. " +
+    "Use `source.consumeEach { x -> ... }`.")
 public operator fun <T> Observable<T>.iterator() = open().iterator()
 
+/**
+ * Subscribes to this [Observable] and performs the specified action for each received element.
+ */
+// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
+public suspend fun <T> Observable<T>.consumeEach(action: suspend (T) -> Unit) {
+    open().use { channel ->
+        for (x in channel) action(x)
+    }
+}
+
 private class SubscriptionChannel<T> : LinkedListChannel<T>(), SubscriptionReceiveChannel<T> {
     @JvmField
     val subscriber: ChannelSubscriber = ChannelSubscriber()
diff --git a/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/IntegrationTest.kt b/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/IntegrationTest.kt
index 491250a..410c38d 100644
--- a/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/IntegrationTest.kt
+++ b/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/IntegrationTest.kt
@@ -52,16 +52,16 @@
 
     @Test
     fun testEmpty(): Unit = runBlocking {
-        val pub = rxObservable<String>(ctx(context)) {
+        val observable = rxObservable<String>(ctx(context)) {
             if (delay) delay(1)
             // does not send anything
         }
-        assertNSE { pub.awaitFirst() }
-        assertThat(pub.awaitFirstOrDefault("OK"), IsEqual("OK"))
-        assertNSE { pub.awaitLast() }
-        assertNSE { pub.awaitSingle() }
+        assertNSE { observable.awaitFirst() }
+        assertThat(observable.awaitFirstOrDefault("OK"), IsEqual("OK"))
+        assertNSE { observable.awaitLast() }
+        assertNSE { observable.awaitSingle() }
         var cnt = 0
-        for (t in pub) {
+        observable.consumeEach {
             cnt++
         }
         assertThat(cnt, IsEqual(0))
@@ -77,8 +77,8 @@
         assertThat(observable.awaitLast(), IsEqual("OK"))
         assertThat(observable.awaitSingle(), IsEqual("OK"))
         var cnt = 0
-        for (t in observable) {
-            assertThat(t, IsEqual("OK"))
+        observable.consumeEach {
+            assertThat(it, IsEqual("OK"))
             cnt++
         }
         assertThat(cnt, IsEqual(1))
@@ -104,8 +104,8 @@
 
     private suspend fun checkNumbers(n: Int, observable: Observable<Int>) {
         var last = 0
-        for (t in observable) {
-            assertThat(t, IsEqual(++last))
+        observable.consumeEach {
+            assertThat(it, IsEqual(++last))
         }
         assertThat(last, IsEqual(n))
     }
diff --git a/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/ObservableCompletionStressTest.kt b/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/ObservableCompletionStressTest.kt
index acc6479..bd2d915 100644
--- a/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/ObservableCompletionStressTest.kt
+++ b/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/ObservableCompletionStressTest.kt
@@ -39,7 +39,7 @@
             runBlocking {
                 withTimeout(5000) {
                     var received = 0
-                    for (x in range(CommonPool, 1, count)) {
+                    range(CommonPool, 1, count).consumeEach { x ->
                         received++
                         if (x != received) error("$x != $received")
                     }
diff --git a/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/ObservableMultiTest.kt b/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/ObservableMultiTest.kt
index 5628c32..427b207 100644
--- a/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/ObservableMultiTest.kt
+++ b/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/ObservableMultiTest.kt
@@ -63,8 +63,7 @@
     fun testIteratorResendUnconfined() {
         val n = 10_000 * stressTestMultiplier
         val observable = rxObservable(Unconfined) {
-            for (x in Observable.range(0, n))
-                send(x)
+            Observable.range(0, n).consumeEach { send(it) }
         }
         checkSingleValue(observable.toList()) { list ->
             assertEquals((0..n - 1).toList(), list)
@@ -75,8 +74,7 @@
     fun testIteratorResendPool() {
         val n = 10_000 * stressTestMultiplier
         val observable = rxObservable(CommonPool) {
-            for (x in Observable.range(0, n))
-                send(x)
+            Observable.range(0, n).consumeEach { send(it) }
         }
         checkSingleValue(observable.toList()) { list ->
             assertEquals((0..n - 1).toList(), list)
diff --git a/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/ObservableSingleTest.kt b/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/ObservableSingleTest.kt
index 26f1221..873810d 100644
--- a/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/ObservableSingleTest.kt
+++ b/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/ObservableSingleTest.kt
@@ -163,8 +163,7 @@
     fun testObservableIteration() {
         val observable = rxObservable(CommonPool) {
             var result = ""
-            for (x in Observable.just("O", "K"))
-                result += x
+            Observable.just("O", "K").consumeEach {result += it }
             send(result)
         }
 
@@ -177,8 +176,7 @@
     fun testObservableIterationFailure() {
         val observable = rxObservable(CommonPool) {
             try {
-                for (x in Observable.error<String>(RuntimeException("OK")))
-                    fail("Should not be here")
+                Observable.error<String>(RuntimeException("OK")).consumeEach { fail("Should not be here") }
                 send("Fail")
             } catch (e: RuntimeException) {
                 send(e.message!!)
diff --git a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxChannel.kt b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxChannel.kt
index 5da80bc..e58116c 100644
--- a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxChannel.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxChannel.kt
@@ -25,7 +25,7 @@
 import java.io.Closeable
 
 /**
- * Return type for [Observable.open] that can be used to [receive] elements from the
+ * Return type for [ObservableSource.open] that can be used to [receive] elements from the
  * subscription and to manually [close] it.
  */
 public interface SubscriptionReceiveChannel<out T> : ReceiveChannel<T>, Closeable {
@@ -36,7 +36,7 @@
 }
 
 /**
- * Subscribes to this [Observable] and returns a channel to receive elements emitted by it.
+ * Subscribes to this [ObservableSource] and returns a channel to receive elements emitted by it.
  */
 public fun <T> ObservableSource<T>.open(): SubscriptionReceiveChannel<T> {
     val channel = SubscriptionChannel<T>()
@@ -50,8 +50,23 @@
  * This is a shortcut for `open().iterator()`. See [open] if you need an ability to manually
  * unsubscribe from the observable.
  */
+@Suppress("DeprecatedCallableAddReplaceWith")
+@Deprecated(message =
+"This iteration operator for `for (x in source) { ... }` loop is deprecated, " +
+    "because it leaves code vulnerable to leaving unclosed subscriptions on exception. " +
+    "Use `source.consumeEach { x -> ... }`.")
 public operator fun <T> ObservableSource<T>.iterator() = open().iterator()
 
+/**
+ * Subscribes to this [ObservableSource] and performs the specified action for each received element.
+ */
+// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
+public suspend fun <T> ObservableSource<T>.consumeEach(action: suspend (T) -> Unit) {
+    open().use { channel ->
+        for (x in channel) action(x)
+    }
+}
+
 private class SubscriptionChannel<T> : LinkedListChannel<T>(), SubscriptionReceiveChannel<T>, Observer<T> {
     @Volatile
     var subscription: Disposable? = null
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-01.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-01.kt
index 8a2acb9..b7423e3 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-01.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-01.kt
@@ -17,28 +17,28 @@
 // This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
 package guide.reactive.basic.example01
 
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.channels.*
+import kotlinx.coroutines.experimental.channels.consumeEach
+import kotlinx.coroutines.experimental.channels.produce
+import kotlinx.coroutines.experimental.delay
+import kotlinx.coroutines.experimental.runBlocking
 
 fun main(args: Array<String>) = runBlocking<Unit> {
-    // create a channel that produces numbers from 1 to 6 with 200ms delays between them
+    // create a channel that produces numbers from 1 to 3 with 200ms delays between them
     val source = produce<Int>(context) {
         println("Begin") // mark the beginning of this coroutine in output
-        for (x in 1..6) {
+        for (x in 1..3) {
             delay(200) // wait for 200ms
             send(x) // send number x to the channel
         }
     }
-    // print the first 3 elements from this channel
-    println("First three:")
-    var cnt = 0
-    for (x in source) { // iterate over the source to receive elements from it
-        println(x)
-        if (++cnt >= 3) break // break when 3 elements are printed
+    // print elements from the source
+    println("Elements:")
+    source.consumeEach { // consume elements from it
+        println(it)
     }
-    // print the remaining elements from this source
-    println("Remaining:")
-    for (x in source) { 
-        println(x)
+    // print elements from the source AGAIN
+    println("Again:")
+    source.consumeEach { // consume elements from it
+        println(it)
     }
 }
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-02.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-02.kt
index 5c31b3e..5f9d9d0 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-02.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-02.kt
@@ -17,29 +17,29 @@
 // This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
 package guide.reactive.basic.example02
 
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import kotlinx.coroutines.experimental.delay
+import kotlinx.coroutines.experimental.reactive.consumeEach
+import kotlinx.coroutines.experimental.reactive.publish
+import kotlinx.coroutines.experimental.runBlocking
 
 fun main(args: Array<String>) = runBlocking<Unit> {
-    // create a channel that produces numbers from 1 to 6 with 200ms delays between them
+    // create a publisher that produces numbers from 1 to 3 with 200ms delays between them
     val source = publish<Int>(context) {  
     //           ^^^^^^^  <---  Difference from the previous examples is here
         println("Begin") // mark the beginning of this coroutine in output
-        for (x in 1..6) {
+        for (x in 1..3) {
             delay(200) // wait for 200ms
             send(x) // send number x to the channel
         }
     }
-    // print the first 3 elements from this channel
-    println("First three:")
-    var cnt = 0
-    for (x in source) { // iterate over the source to receive elements from it
-        println(x)
-        if (++cnt >= 3) break // break when 3 elements are printed
+    // print elements from the source
+    println("Elements:")
+    source.consumeEach { // consume elements from it
+        println(it)
     }
-    // print the remaining elements from this source
-    println("Remaining:")
-    for (x in source) { 
-        println(x)
+    // print elements from the source AGAIN
+    println("Again:")
+    source.consumeEach { // consume elements from it
+        println(it)
     }
 }
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-03.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-03.kt
index 95c098d..b0f0068 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-03.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-03.kt
@@ -17,14 +17,20 @@
 // This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
 package guide.reactive.basic.example03
 
-import io.reactivex.*
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import io.reactivex.Flowable
+import kotlinx.coroutines.experimental.reactive.open
+import kotlinx.coroutines.experimental.runBlocking
 
 fun main(args: Array<String>) = runBlocking<Unit> {
-    val source = Flowable.range(1, 3) // Rx 2.x operator to produce a range of integers
-    // iterate over the source once
-    for (x in source) println("First pass $x")
-    // iterate over the source again
-    for (x in source) println("Second pass $x")
+    val source = Flowable.range(1, 5) // a range of five numbers
+        .doOnSubscribe { println("OnSubscribe") } // provide some insight
+        .doFinally { println("Finally") }         // ... into what's going on
+    var cnt = 0 
+    source.open().use { channel -> // open channel to the source
+        for (x in channel) { // iterate over the channel to receive elements from it
+            println(x)
+            if (++cnt >= 3) break // break when 3 elements are printed
+        }
+        // `use` will close the channel when this block of code is complete
+    }
 }
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-04.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-04.kt
index b00da76..e41d8dc 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-04.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-04.kt
@@ -17,19 +17,14 @@
 // This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
 package guide.reactive.basic.example04
 
-import io.reactivex.*
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import io.reactivex.Flowable
+import kotlinx.coroutines.experimental.reactive.consumeEach
+import kotlinx.coroutines.experimental.runBlocking
 
 fun main(args: Array<String>) = runBlocking<Unit> {
     val source = Flowable.range(1, 5) // a range of five numbers
         .doOnSubscribe { println("OnSubscribe") } // provide some insight
         .doFinally { println("Finally") }         // ... into what's going on
-    var cnt = 0 
-    val channel = source.open() // open channel to the source
-    for (x in channel) { // iterate over the channel to receive elements from it
-        println(x)
-        if (++cnt >= 3) break // break when 3 elements are printed
-    }
-    channel.close() // close the channel
+    // iterate over the source fully
+    source.consumeEach { println(it) }
 }
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-05.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-05.kt
index 8fe2b5b..23c6e04 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-05.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-05.kt
@@ -17,14 +17,26 @@
 // This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
 package guide.reactive.basic.example05
 
-import io.reactivex.*
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import io.reactivex.schedulers.Schedulers
+import kotlinx.coroutines.experimental.delay
+import kotlinx.coroutines.experimental.runBlocking
+import kotlinx.coroutines.experimental.rx2.rxFlowable
 
-fun main(args: Array<String>) = runBlocking<Unit> {
-    val source = Flowable.range(1, 5) // a range of five numbers
-        .doOnSubscribe { println("OnSubscribe") } // provide some insight
-        .doFinally { println("Finally") }         // ... into what's going on
-    // iterate over the source fully
-    for (x in source) println(x)
+fun main(args: Array<String>) = runBlocking<Unit> { 
+    // coroutine -- fast producer of elements in the context of the main thread
+    val source = rxFlowable(context) {
+        for (x in 1..5) {
+            println("Sending $x ...")
+            send(x) // this is a suspending function
+        }
+    }
+    // subscribe on another thread with a slow subscriber using Rx
+    source
+        .observeOn(Schedulers.io(), false, 1) // specify buffer size of 1 item
+        .doOnComplete { println("Complete") }
+        .subscribe { x ->
+            println("Received $x")
+            Thread.sleep(200) // 200 ms to process each item
+        }
+    delay(2000) // suspend main thread for couple of seconds
 }
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-06.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-06.kt
deleted file mode 100644
index 56e75da..0000000
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-06.kt
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2016-2017 JetBrains s.r.o.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
-package guide.reactive.basic.example06
-
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.rx2.rxFlowable
-import io.reactivex.schedulers.Schedulers
-
-fun main(args: Array<String>) = runBlocking<Unit> { 
-    // coroutine -- fast producer of elements in the context of the main thread
-    val source = rxFlowable(context) {
-        for (x in 1..5) {
-            println("Sending $x ...")
-            send(x) // this is a suspending function
-        }
-    }
-    // subscribe on another thread with a slow subscriber using Rx
-    source
-        .observeOn(Schedulers.io(), false, 1) // specify buffer size of 1 item
-        .doOnComplete { println("Complete") }
-        .subscribe { x ->
-            println("Received $x")
-            Thread.sleep(200) // 200 ms to process each item
-        }
-    delay(2000) // suspend main thread for couple of seconds
-}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-01.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-01.kt
index 02c94c4..adaede0 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-01.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-01.kt
@@ -17,7 +17,8 @@
 // This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
 package guide.reactive.context.example01
 
-import io.reactivex.*
+import io.reactivex.Flowable
+import io.reactivex.Scheduler
 import io.reactivex.functions.BiFunction
 import io.reactivex.schedulers.Schedulers
 import java.util.concurrent.TimeUnit
@@ -29,8 +30,7 @@
         BiFunction { x, _ -> x })
 
 fun main(args: Array<String>) {
-    rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3).subscribe { x ->
-        println("$x on thread ${Thread.currentThread().name}")
-    }
+    rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
+        .subscribe { println("$it on thread ${Thread.currentThread().name}") }
     Thread.sleep(1000)
 }
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-02.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-02.kt
index b26ee52..4e551a5 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-02.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-02.kt
@@ -17,9 +17,10 @@
 // This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
 package guide.reactive.context.example02
 
-import io.reactivex.*
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import io.reactivex.Flowable
+import kotlinx.coroutines.experimental.CommonPool
+import kotlinx.coroutines.experimental.delay
+import kotlinx.coroutines.experimental.reactive.publish
 import kotlin.coroutines.experimental.CoroutineContext
 
 fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
@@ -31,8 +32,6 @@
 
 fun main(args: Array<String>) {
     Flowable.fromPublisher(rangeWithInterval(CommonPool, 100, 1, 3))
-        .subscribe { x ->
-            println("$x on thread ${Thread.currentThread().name}")
-        }
+        .subscribe { println("$it on thread ${Thread.currentThread().name}") }
     Thread.sleep(1000)
 }
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-03.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-03.kt
index 6534fa9..20ff91c 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-03.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-03.kt
@@ -17,10 +17,11 @@
 // This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
 package guide.reactive.context.example03
 
-import io.reactivex.*
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import io.reactivex.Flowable
 import io.reactivex.schedulers.Schedulers
+import kotlinx.coroutines.experimental.CommonPool
+import kotlinx.coroutines.experimental.delay
+import kotlinx.coroutines.experimental.reactive.publish
 import kotlin.coroutines.experimental.CoroutineContext
 
 fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
@@ -33,8 +34,6 @@
 fun main(args: Array<String>) {
     Flowable.fromPublisher(rangeWithInterval(CommonPool, 100, 1, 3))
         .observeOn(Schedulers.computation())                           // <-- THIS LINE IS ADDED
-        .subscribe { x ->
-            println("$x on thread ${Thread.currentThread().name}")
-        }
+        .subscribe { println("$it on thread ${Thread.currentThread().name}") }
     Thread.sleep(1000)
 }
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-04.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-04.kt
index 91e72dc..0f55479 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-04.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-04.kt
@@ -17,11 +17,12 @@
 // This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
 package guide.reactive.context.example04
 
-import io.reactivex.*
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import io.reactivex.Flowable
+import io.reactivex.Scheduler
 import io.reactivex.functions.BiFunction
 import io.reactivex.schedulers.Schedulers
+import kotlinx.coroutines.experimental.reactive.consumeEach
+import kotlinx.coroutines.experimental.runBlocking
 import java.util.concurrent.TimeUnit
 
 fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
@@ -31,6 +32,6 @@
         BiFunction { x, _ -> x })
 
 fun main(args: Array<String>) = runBlocking<Unit> {
-    for (x in rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3))
-        println("$x on thread ${Thread.currentThread().name}")
+    rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
+        .consumeEach { println("$it on thread ${Thread.currentThread().name}") }
 }
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-05.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-05.kt
index 8b3dbfd..c9e493e 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-05.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-05.kt
@@ -17,11 +17,14 @@
 // This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
 package guide.reactive.context.example05
 
-import io.reactivex.*
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import io.reactivex.Flowable
+import io.reactivex.Scheduler
 import io.reactivex.functions.BiFunction
 import io.reactivex.schedulers.Schedulers
+import kotlinx.coroutines.experimental.Unconfined
+import kotlinx.coroutines.experimental.launch
+import kotlinx.coroutines.experimental.reactive.consumeEach
+import kotlinx.coroutines.experimental.runBlocking
 import java.util.concurrent.TimeUnit
 
 fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
@@ -32,8 +35,8 @@
 
 fun main(args: Array<String>) = runBlocking<Unit> {
     val job = launch(Unconfined) { // launch new coroutine in Unconfined context (without its own thread pool)
-        for (x in rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3))
-            println("$x on thread ${Thread.currentThread().name}")
+        rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
+            .consumeEach { println("$it on thread ${Thread.currentThread().name}") }
     }
     job.join() // wait for our coroutine to complete
 }
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-01.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-01.kt
index 217a7ef..d4ce0e0 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-01.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-01.kt
@@ -17,8 +17,10 @@
 // This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
 package guide.reactive.operators.example01
 
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import kotlinx.coroutines.experimental.CommonPool
+import kotlinx.coroutines.experimental.reactive.consumeEach
+import kotlinx.coroutines.experimental.reactive.publish
+import kotlinx.coroutines.experimental.runBlocking
 import kotlin.coroutines.experimental.CoroutineContext
 
 fun range(context: CoroutineContext, start: Int, count: Int) = publish<Int>(context) {
@@ -26,5 +28,5 @@
 }
 
 fun main(args: Array<String>) = runBlocking<Unit> {
-    for (x in range(CommonPool, 1, 5)) println(x)
+    range(CommonPool, 1, 5).consumeEach { println(it) }
 }
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-02.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-02.kt
index 7cb3484..fc34009 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-02.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-02.kt
@@ -17,19 +17,21 @@
 // This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
 package guide.reactive.operators.example02
 
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import kotlinx.coroutines.experimental.reactive.consumeEach
+import kotlinx.coroutines.experimental.reactive.publish
+import kotlinx.coroutines.experimental.runBlocking
 import org.reactivestreams.Publisher
 import kotlin.coroutines.experimental.CoroutineContext
 
 fun <T, R> Publisher<T>.fusedFilterMap(
-    context: CoroutineContext,      // the context to execute this coroutine in
-    predicate: (T) -> Boolean,      // the filter predicate
-    mapper: (T) -> R                // the mapper function
+    context: CoroutineContext,   // the context to execute this coroutine in
+    predicate: (T) -> Boolean,   // the filter predicate
+    mapper: (T) -> R             // the mapper function
 ) = publish<R>(context) {
-    for (x in this@fusedFilterMap)  // iterate of the source stream 
-        if (predicate(x))           // filter part
-            send(mapper(x))         // map part
+    consumeEach {                // consume the source stream 
+        if (predicate(it))       // filter part
+            send(mapper(it))     // map part
+    }        
 }
 
 fun range(context: CoroutineContext, start: Int, count: Int) = publish<Int>(context) {
@@ -37,7 +39,7 @@
 }
 
 fun main(args: Array<String>) = runBlocking<Unit> {
-   val result = range(context, 1, 5)
+   range(context, 1, 5)
        .fusedFilterMap(context, { it % 2 == 0}, { "$it is even" })
-   for (x in result) println(x) // print all strings from result
+       .consumeEach { println(it) } // print all the resulting strings
 }
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-03.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-03.kt
index 2ece476..bbf97fd 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-03.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-03.kt
@@ -17,17 +17,20 @@
 // This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
 package guide.reactive.operators.example03
 
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import kotlinx.coroutines.experimental.delay
+import kotlinx.coroutines.experimental.reactive.consumeEach
+import kotlinx.coroutines.experimental.reactive.open
+import kotlinx.coroutines.experimental.reactive.publish
+import kotlinx.coroutines.experimental.runBlocking
+import kotlinx.coroutines.experimental.selects.whileSelect
 import org.reactivestreams.Publisher
 import kotlin.coroutines.experimental.CoroutineContext
-import kotlinx.coroutines.experimental.selects.whileSelect
 
 fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = publish<T>(context) {
-    this@takeUntil.open().use { thisChannel -> // open channel to Publisher<T>
-        other.open().use { otherChannel ->     // open channel to Publisher<U>
+    this@takeUntil.open().use { thisChannel ->           // explicitly open channel to Publisher<T>
+        other.open().use { otherChannel ->               // explicitly open channel to Publisher<U>
             whileSelect {
-                otherChannel.onReceive { false } // bail out on any received element from `other`
+                otherChannel.onReceive { false }         // bail out on any received element from `other`
                 thisChannel.onReceive { send(it); true } // resend element from this channel and continue
             }
         }
@@ -42,7 +45,7 @@
 }
 
 fun main(args: Array<String>) = runBlocking<Unit> {
-    val slowNums = rangeWithInterval(context, 200, 1, 10) // numbers with 200ms interval
-    val stop = rangeWithInterval(context, 500, 1, 10) // the first one after 500ms
-    for (x in slowNums.takeUntil(context, stop)) println(x) // let's test it
+    val slowNums = rangeWithInterval(context, 200, 1, 10)         // numbers with 200ms interval
+    val stop = rangeWithInterval(context, 500, 1, 10)             // the first one after 500ms
+    slowNums.takeUntil(context, stop).consumeEach { println(it) } // let's test it
 }
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-04.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-04.kt
index 76eb713..ad3ebe3 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-04.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-04.kt
@@ -17,15 +17,18 @@
 // This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
 package guide.reactive.operators.example04
 
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import kotlinx.coroutines.experimental.delay
+import kotlinx.coroutines.experimental.launch
+import kotlinx.coroutines.experimental.reactive.consumeEach
+import kotlinx.coroutines.experimental.reactive.publish
+import kotlinx.coroutines.experimental.runBlocking
 import org.reactivestreams.Publisher
 import kotlin.coroutines.experimental.CoroutineContext
 
 fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = publish<T>(context) {
-  for (pub in this@merge) {      // for each publisher received on the source channel
-      launch(this.context) {     // launch a child coroutine
-          for (x in pub) send(x) // resend all element from this publisher
+  consumeEach { pub ->                 // for each publisher received on the source channel
+      launch(this.context) {           // launch a child coroutine
+          pub.consumeEach { send(it) } // resend all element from this publisher
       }
   }
 }
@@ -45,5 +48,5 @@
 }
 
 fun main(args: Array<String>) = runBlocking<Unit> {
-    for (x in testPub(context).merge(context)) println(x) // print the whole stream
+    testPub(context).merge(context).consumeEach { println(it) } // print the whole stream
 }
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/test/GuideReactiveTest.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/test/GuideReactiveTest.kt
index 6ce61be..4f26b56 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/test/GuideReactiveTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/test/GuideReactiveTest.kt
@@ -8,52 +8,34 @@
     @Test
     fun testGuideReactiveBasicExample01() {
         test { guide.reactive.basic.example01.main(emptyArray()) }.verifyLines(
-            "First three:",
+            "Elements:",
             "Begin",
             "1",
             "2",
             "3",
-            "Remaining:",
-            "4",
-            "5",
-            "6"
+            "Again:"
         )
     }
 
     @Test
     fun testGuideReactiveBasicExample02() {
         test { guide.reactive.basic.example02.main(emptyArray()) }.verifyLines(
-            "First three:",
+            "Elements:",
             "Begin",
             "1",
             "2",
             "3",
-            "Remaining:",
+            "Again:",
             "Begin",
             "1",
             "2",
-            "3",
-            "4",
-            "5",
-            "6"
+            "3"
         )
     }
 
     @Test
     fun testGuideReactiveBasicExample03() {
         test { guide.reactive.basic.example03.main(emptyArray()) }.verifyLines(
-            "First pass 1",
-            "First pass 2",
-            "First pass 3",
-            "Second pass 1",
-            "Second pass 2",
-            "Second pass 3"
-        )
-    }
-
-    @Test
-    fun testGuideReactiveBasicExample04() {
-        test { guide.reactive.basic.example04.main(emptyArray()) }.verifyLines(
             "OnSubscribe",
             "1",
             "2",
@@ -63,8 +45,8 @@
     }
 
     @Test
-    fun testGuideReactiveBasicExample05() {
-        test { guide.reactive.basic.example05.main(emptyArray()) }.verifyLines(
+    fun testGuideReactiveBasicExample04() {
+        test { guide.reactive.basic.example04.main(emptyArray()) }.verifyLines(
             "OnSubscribe",
             "1",
             "2",
@@ -76,8 +58,8 @@
     }
 
     @Test
-    fun testGuideReactiveBasicExample06() {
-        test { guide.reactive.basic.example06.main(emptyArray()) }.verifyLines(
+    fun testGuideReactiveBasicExample05() {
+        test { guide.reactive.basic.example05.main(emptyArray()) }.verifyLines(
             "Sending 1 ...",
             "Sending 2 ...",
             "Received 1",
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/IntegrationTest.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/IntegrationTest.kt
index 6a0a0d1..fdd10c1 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/IntegrationTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/IntegrationTest.kt
@@ -52,16 +52,16 @@
 
     @Test
     fun testEmpty(): Unit = runBlocking {
-        val pub = rxObservable<String>(ctx(context)) {
+        val observable = rxObservable<String>(ctx(context)) {
             if (delay) delay(1)
             // does not send anything
         }
-        assertNSE { pub.awaitFirst() }
-        assertThat(pub.awaitFirstOrDefault("OK"), IsEqual("OK"))
-        assertNSE { pub.awaitLast() }
-        assertNSE { pub.awaitSingle() }
+        assertNSE { observable.awaitFirst() }
+        assertThat(observable.awaitFirstOrDefault("OK"), IsEqual("OK"))
+        assertNSE { observable.awaitLast() }
+        assertNSE { observable.awaitSingle() }
         var cnt = 0
-        for (t in pub) {
+        observable.consumeEach {
             cnt++
         }
         assertThat(cnt, IsEqual(0))
@@ -77,8 +77,8 @@
         assertThat(observable.awaitLast(), IsEqual("OK"))
         assertThat(observable.awaitSingle(), IsEqual("OK"))
         var cnt = 0
-        for (t in observable) {
-            assertThat(t, IsEqual("OK"))
+        observable.consumeEach {
+            assertThat(it, IsEqual("OK"))
             cnt++
         }
         assertThat(cnt, IsEqual(1))
@@ -104,8 +104,8 @@
 
     private suspend fun checkNumbers(n: Int, observable: Observable<Int>) {
         var last = 0
-        for (t in observable) {
-            assertThat(t, IsEqual(++last))
+        observable.consumeEach {
+            assertThat(it, IsEqual(++last))
         }
         assertThat(last, IsEqual(n))
     }
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableCompletionStressTest.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableCompletionStressTest.kt
index 2af86ce..bf61ba1 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableCompletionStressTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableCompletionStressTest.kt
@@ -14,13 +14,11 @@
  * limitations under the License.
  */
 
-package kotlinx.coroutines.experimental.rx1
+package kotlinx.coroutines.experimental.rx2
 
 import kotlinx.coroutines.experimental.CommonPool
 import kotlinx.coroutines.experimental.TestBase
 import kotlinx.coroutines.experimental.runBlocking
-import kotlinx.coroutines.experimental.rx2.iterator
-import kotlinx.coroutines.experimental.rx2.rxObservable
 import kotlinx.coroutines.experimental.withTimeout
 import org.junit.Test
 import java.util.*
@@ -41,7 +39,7 @@
             runBlocking {
                 withTimeout(5000) {
                     var received = 0
-                    for (x in range(CommonPool, 1, count)) {
+                    range(CommonPool, 1, count).consumeEach { x ->
                         received++
                         if (x != received) error("$x != $received")
                     }
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableMultiTest.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableMultiTest.kt
index bf5503a..0f36404 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableMultiTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableMultiTest.kt
@@ -16,13 +16,13 @@
 
 package kotlinx.coroutines.experimental.rx2
 
+import io.reactivex.Observable
 import kotlinx.coroutines.experimental.CommonPool
 import kotlinx.coroutines.experimental.TestBase
 import kotlinx.coroutines.experimental.Unconfined
 import kotlinx.coroutines.experimental.launch
 import org.junit.Assert.assertEquals
 import org.junit.Test
-import io.reactivex.Observable
 import java.io.IOException
 
 /**
@@ -63,8 +63,7 @@
     fun testIteratorResendUnconfined() {
         val n = 10_000 * stressTestMultiplier
         val observable = rxObservable(Unconfined) {
-            for (x in Observable.range(0, n))
-                send(x)
+            Observable.range(0, n).consumeEach { send(it) }
         }
         checkSingleValue(observable.toList()) { list ->
             assertEquals((0..n - 1).toList(), list)
@@ -75,8 +74,7 @@
     fun testIteratorResendPool() {
         val n = 10_000 * stressTestMultiplier
         val observable = rxObservable(CommonPool) {
-            for (x in Observable.range(0, n))
-                send(x)
+            Observable.range(0, n).consumeEach { send(it) }
         }
         checkSingleValue(observable.toList()) { list ->
             assertEquals((0..n - 1).toList(), list)
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableSingleTest.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableSingleTest.kt
index b9b45ff..a20a6be 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableSingleTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableSingleTest.kt
@@ -152,8 +152,7 @@
     fun testObservableIteration() {
         val observable = rxObservable(CommonPool) {
             var result = ""
-            for (x in Observable.just("O", "K"))
-                result += x
+            Observable.just("O", "K").consumeEach { result += it }
             send(result)
         }
 
@@ -166,8 +165,7 @@
     fun testObservableIterationFailure() {
         val observable = rxObservable(CommonPool) {
             try {
-                for (x in Observable.error<String>(RuntimeException("OK")))
-                    fail("Should not be here")
+                Observable.error<String>(RuntimeException("OK")).consumeEach { fail("Should not be here") }
                 send("Fail")
             } catch (e: RuntimeException) {
                 send(e.message!!)