Introduced `consumeEach` for channels and reactive streams, deprecated iteration on reactive streams
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-01.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-01.kt
index 8a2acb9..b7423e3 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-01.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-01.kt
@@ -17,28 +17,28 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package guide.reactive.basic.example01
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.channels.*
+import kotlinx.coroutines.experimental.channels.consumeEach
+import kotlinx.coroutines.experimental.channels.produce
+import kotlinx.coroutines.experimental.delay
+import kotlinx.coroutines.experimental.runBlocking
fun main(args: Array<String>) = runBlocking<Unit> {
- // create a channel that produces numbers from 1 to 6 with 200ms delays between them
+ // create a channel that produces numbers from 1 to 3 with 200ms delays between them
val source = produce<Int>(context) {
println("Begin") // mark the beginning of this coroutine in output
- for (x in 1..6) {
+ for (x in 1..3) {
delay(200) // wait for 200ms
send(x) // send number x to the channel
}
}
- // print the first 3 elements from this channel
- println("First three:")
- var cnt = 0
- for (x in source) { // iterate over the source to receive elements from it
- println(x)
- if (++cnt >= 3) break // break when 3 elements are printed
+ // print elements from the source
+ println("Elements:")
+ source.consumeEach { // consume elements from it
+ println(it)
}
- // print the remaining elements from this source
- println("Remaining:")
- for (x in source) {
- println(x)
+ // print elements from the source AGAIN
+ println("Again:")
+ source.consumeEach { // consume elements from it
+ println(it)
}
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-02.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-02.kt
index 5c31b3e..5f9d9d0 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-02.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-02.kt
@@ -17,29 +17,29 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package guide.reactive.basic.example02
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import kotlinx.coroutines.experimental.delay
+import kotlinx.coroutines.experimental.reactive.consumeEach
+import kotlinx.coroutines.experimental.reactive.publish
+import kotlinx.coroutines.experimental.runBlocking
fun main(args: Array<String>) = runBlocking<Unit> {
- // create a channel that produces numbers from 1 to 6 with 200ms delays between them
+ // create a publisher that produces numbers from 1 to 3 with 200ms delays between them
val source = publish<Int>(context) {
// ^^^^^^^ <--- Difference from the previous examples is here
println("Begin") // mark the beginning of this coroutine in output
- for (x in 1..6) {
+ for (x in 1..3) {
delay(200) // wait for 200ms
send(x) // send number x to the channel
}
}
- // print the first 3 elements from this channel
- println("First three:")
- var cnt = 0
- for (x in source) { // iterate over the source to receive elements from it
- println(x)
- if (++cnt >= 3) break // break when 3 elements are printed
+ // print elements from the source
+ println("Elements:")
+ source.consumeEach { // consume elements from it
+ println(it)
}
- // print the remaining elements from this source
- println("Remaining:")
- for (x in source) {
- println(x)
+ // print elements from the source AGAIN
+ println("Again:")
+ source.consumeEach { // consume elements from it
+ println(it)
}
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-03.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-03.kt
index 95c098d..b0f0068 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-03.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-03.kt
@@ -17,14 +17,20 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package guide.reactive.basic.example03
-import io.reactivex.*
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import io.reactivex.Flowable
+import kotlinx.coroutines.experimental.reactive.open
+import kotlinx.coroutines.experimental.runBlocking
fun main(args: Array<String>) = runBlocking<Unit> {
- val source = Flowable.range(1, 3) // Rx 2.x operator to produce a range of integers
- // iterate over the source once
- for (x in source) println("First pass $x")
- // iterate over the source again
- for (x in source) println("Second pass $x")
+ val source = Flowable.range(1, 5) // a range of five numbers
+ .doOnSubscribe { println("OnSubscribe") } // provide some insight
+ .doFinally { println("Finally") } // ... into what's going on
+ var cnt = 0
+ source.open().use { channel -> // open channel to the source
+ for (x in channel) { // iterate over the channel to receive elements from it
+ println(x)
+ if (++cnt >= 3) break // break when 3 elements are printed
+ }
+ // `use` will close the channel when this block of code is complete
+ }
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-04.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-04.kt
index b00da76..e41d8dc 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-04.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-04.kt
@@ -17,19 +17,14 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package guide.reactive.basic.example04
-import io.reactivex.*
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import io.reactivex.Flowable
+import kotlinx.coroutines.experimental.reactive.consumeEach
+import kotlinx.coroutines.experimental.runBlocking
fun main(args: Array<String>) = runBlocking<Unit> {
val source = Flowable.range(1, 5) // a range of five numbers
.doOnSubscribe { println("OnSubscribe") } // provide some insight
.doFinally { println("Finally") } // ... into what's going on
- var cnt = 0
- val channel = source.open() // open channel to the source
- for (x in channel) { // iterate over the channel to receive elements from it
- println(x)
- if (++cnt >= 3) break // break when 3 elements are printed
- }
- channel.close() // close the channel
+ // iterate over the source fully
+ source.consumeEach { println(it) }
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-05.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-05.kt
index 8fe2b5b..23c6e04 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-05.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-05.kt
@@ -17,14 +17,26 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package guide.reactive.basic.example05
-import io.reactivex.*
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import io.reactivex.schedulers.Schedulers
+import kotlinx.coroutines.experimental.delay
+import kotlinx.coroutines.experimental.runBlocking
+import kotlinx.coroutines.experimental.rx2.rxFlowable
-fun main(args: Array<String>) = runBlocking<Unit> {
- val source = Flowable.range(1, 5) // a range of five numbers
- .doOnSubscribe { println("OnSubscribe") } // provide some insight
- .doFinally { println("Finally") } // ... into what's going on
- // iterate over the source fully
- for (x in source) println(x)
+fun main(args: Array<String>) = runBlocking<Unit> {
+ // coroutine -- fast producer of elements in the context of the main thread
+ val source = rxFlowable(context) {
+ for (x in 1..5) {
+ println("Sending $x ...")
+ send(x) // this is a suspending function
+ }
+ }
+ // subscribe on another thread with a slow subscriber using Rx
+ source
+ .observeOn(Schedulers.io(), false, 1) // specify buffer size of 1 item
+ .doOnComplete { println("Complete") }
+ .subscribe { x ->
+ println("Received $x")
+ Thread.sleep(200) // 200 ms to process each item
+ }
+ delay(2000) // suspend main thread for couple of seconds
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-06.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-06.kt
deleted file mode 100644
index 56e75da..0000000
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-06.kt
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2016-2017 JetBrains s.r.o.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
-package guide.reactive.basic.example06
-
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.rx2.rxFlowable
-import io.reactivex.schedulers.Schedulers
-
-fun main(args: Array<String>) = runBlocking<Unit> {
- // coroutine -- fast producer of elements in the context of the main thread
- val source = rxFlowable(context) {
- for (x in 1..5) {
- println("Sending $x ...")
- send(x) // this is a suspending function
- }
- }
- // subscribe on another thread with a slow subscriber using Rx
- source
- .observeOn(Schedulers.io(), false, 1) // specify buffer size of 1 item
- .doOnComplete { println("Complete") }
- .subscribe { x ->
- println("Received $x")
- Thread.sleep(200) // 200 ms to process each item
- }
- delay(2000) // suspend main thread for couple of seconds
-}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-01.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-01.kt
index 02c94c4..adaede0 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-01.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-01.kt
@@ -17,7 +17,8 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package guide.reactive.context.example01
-import io.reactivex.*
+import io.reactivex.Flowable
+import io.reactivex.Scheduler
import io.reactivex.functions.BiFunction
import io.reactivex.schedulers.Schedulers
import java.util.concurrent.TimeUnit
@@ -29,8 +30,7 @@
BiFunction { x, _ -> x })
fun main(args: Array<String>) {
- rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3).subscribe { x ->
- println("$x on thread ${Thread.currentThread().name}")
- }
+ rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
+ .subscribe { println("$it on thread ${Thread.currentThread().name}") }
Thread.sleep(1000)
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-02.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-02.kt
index b26ee52..4e551a5 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-02.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-02.kt
@@ -17,9 +17,10 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package guide.reactive.context.example02
-import io.reactivex.*
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import io.reactivex.Flowable
+import kotlinx.coroutines.experimental.CommonPool
+import kotlinx.coroutines.experimental.delay
+import kotlinx.coroutines.experimental.reactive.publish
import kotlin.coroutines.experimental.CoroutineContext
fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
@@ -31,8 +32,6 @@
fun main(args: Array<String>) {
Flowable.fromPublisher(rangeWithInterval(CommonPool, 100, 1, 3))
- .subscribe { x ->
- println("$x on thread ${Thread.currentThread().name}")
- }
+ .subscribe { println("$it on thread ${Thread.currentThread().name}") }
Thread.sleep(1000)
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-03.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-03.kt
index 6534fa9..20ff91c 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-03.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-03.kt
@@ -17,10 +17,11 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package guide.reactive.context.example03
-import io.reactivex.*
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import io.reactivex.Flowable
import io.reactivex.schedulers.Schedulers
+import kotlinx.coroutines.experimental.CommonPool
+import kotlinx.coroutines.experimental.delay
+import kotlinx.coroutines.experimental.reactive.publish
import kotlin.coroutines.experimental.CoroutineContext
fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
@@ -33,8 +34,6 @@
fun main(args: Array<String>) {
Flowable.fromPublisher(rangeWithInterval(CommonPool, 100, 1, 3))
.observeOn(Schedulers.computation()) // <-- THIS LINE IS ADDED
- .subscribe { x ->
- println("$x on thread ${Thread.currentThread().name}")
- }
+ .subscribe { println("$it on thread ${Thread.currentThread().name}") }
Thread.sleep(1000)
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-04.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-04.kt
index 91e72dc..0f55479 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-04.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-04.kt
@@ -17,11 +17,12 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package guide.reactive.context.example04
-import io.reactivex.*
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import io.reactivex.Flowable
+import io.reactivex.Scheduler
import io.reactivex.functions.BiFunction
import io.reactivex.schedulers.Schedulers
+import kotlinx.coroutines.experimental.reactive.consumeEach
+import kotlinx.coroutines.experimental.runBlocking
import java.util.concurrent.TimeUnit
fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
@@ -31,6 +32,6 @@
BiFunction { x, _ -> x })
fun main(args: Array<String>) = runBlocking<Unit> {
- for (x in rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3))
- println("$x on thread ${Thread.currentThread().name}")
+ rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
+ .consumeEach { println("$it on thread ${Thread.currentThread().name}") }
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-05.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-05.kt
index 8b3dbfd..c9e493e 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-05.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-05.kt
@@ -17,11 +17,14 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package guide.reactive.context.example05
-import io.reactivex.*
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import io.reactivex.Flowable
+import io.reactivex.Scheduler
import io.reactivex.functions.BiFunction
import io.reactivex.schedulers.Schedulers
+import kotlinx.coroutines.experimental.Unconfined
+import kotlinx.coroutines.experimental.launch
+import kotlinx.coroutines.experimental.reactive.consumeEach
+import kotlinx.coroutines.experimental.runBlocking
import java.util.concurrent.TimeUnit
fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
@@ -32,8 +35,8 @@
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch(Unconfined) { // launch new coroutine in Unconfined context (without its own thread pool)
- for (x in rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3))
- println("$x on thread ${Thread.currentThread().name}")
+ rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
+ .consumeEach { println("$it on thread ${Thread.currentThread().name}") }
}
job.join() // wait for our coroutine to complete
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-01.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-01.kt
index 217a7ef..d4ce0e0 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-01.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-01.kt
@@ -17,8 +17,10 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package guide.reactive.operators.example01
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import kotlinx.coroutines.experimental.CommonPool
+import kotlinx.coroutines.experimental.reactive.consumeEach
+import kotlinx.coroutines.experimental.reactive.publish
+import kotlinx.coroutines.experimental.runBlocking
import kotlin.coroutines.experimental.CoroutineContext
fun range(context: CoroutineContext, start: Int, count: Int) = publish<Int>(context) {
@@ -26,5 +28,5 @@
}
fun main(args: Array<String>) = runBlocking<Unit> {
- for (x in range(CommonPool, 1, 5)) println(x)
+ range(CommonPool, 1, 5).consumeEach { println(it) }
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-02.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-02.kt
index 7cb3484..fc34009 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-02.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-02.kt
@@ -17,19 +17,21 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package guide.reactive.operators.example02
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import kotlinx.coroutines.experimental.reactive.consumeEach
+import kotlinx.coroutines.experimental.reactive.publish
+import kotlinx.coroutines.experimental.runBlocking
import org.reactivestreams.Publisher
import kotlin.coroutines.experimental.CoroutineContext
fun <T, R> Publisher<T>.fusedFilterMap(
- context: CoroutineContext, // the context to execute this coroutine in
- predicate: (T) -> Boolean, // the filter predicate
- mapper: (T) -> R // the mapper function
+ context: CoroutineContext, // the context to execute this coroutine in
+ predicate: (T) -> Boolean, // the filter predicate
+ mapper: (T) -> R // the mapper function
) = publish<R>(context) {
- for (x in this@fusedFilterMap) // iterate of the source stream
- if (predicate(x)) // filter part
- send(mapper(x)) // map part
+ consumeEach { // consume the source stream
+ if (predicate(it)) // filter part
+ send(mapper(it)) // map part
+ }
}
fun range(context: CoroutineContext, start: Int, count: Int) = publish<Int>(context) {
@@ -37,7 +39,7 @@
}
fun main(args: Array<String>) = runBlocking<Unit> {
- val result = range(context, 1, 5)
+ range(context, 1, 5)
.fusedFilterMap(context, { it % 2 == 0}, { "$it is even" })
- for (x in result) println(x) // print all strings from result
+ .consumeEach { println(it) } // print all the resulting strings
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-03.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-03.kt
index 2ece476..bbf97fd 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-03.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-03.kt
@@ -17,17 +17,20 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package guide.reactive.operators.example03
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import kotlinx.coroutines.experimental.delay
+import kotlinx.coroutines.experimental.reactive.consumeEach
+import kotlinx.coroutines.experimental.reactive.open
+import kotlinx.coroutines.experimental.reactive.publish
+import kotlinx.coroutines.experimental.runBlocking
+import kotlinx.coroutines.experimental.selects.whileSelect
import org.reactivestreams.Publisher
import kotlin.coroutines.experimental.CoroutineContext
-import kotlinx.coroutines.experimental.selects.whileSelect
fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = publish<T>(context) {
- this@takeUntil.open().use { thisChannel -> // open channel to Publisher<T>
- other.open().use { otherChannel -> // open channel to Publisher<U>
+ this@takeUntil.open().use { thisChannel -> // explicitly open channel to Publisher<T>
+ other.open().use { otherChannel -> // explicitly open channel to Publisher<U>
whileSelect {
- otherChannel.onReceive { false } // bail out on any received element from `other`
+ otherChannel.onReceive { false } // bail out on any received element from `other`
thisChannel.onReceive { send(it); true } // resend element from this channel and continue
}
}
@@ -42,7 +45,7 @@
}
fun main(args: Array<String>) = runBlocking<Unit> {
- val slowNums = rangeWithInterval(context, 200, 1, 10) // numbers with 200ms interval
- val stop = rangeWithInterval(context, 500, 1, 10) // the first one after 500ms
- for (x in slowNums.takeUntil(context, stop)) println(x) // let's test it
+ val slowNums = rangeWithInterval(context, 200, 1, 10) // numbers with 200ms interval
+ val stop = rangeWithInterval(context, 500, 1, 10) // the first one after 500ms
+ slowNums.takeUntil(context, stop).consumeEach { println(it) } // let's test it
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-04.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-04.kt
index 76eb713..ad3ebe3 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-04.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-04.kt
@@ -17,15 +17,18 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package guide.reactive.operators.example04
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import kotlinx.coroutines.experimental.delay
+import kotlinx.coroutines.experimental.launch
+import kotlinx.coroutines.experimental.reactive.consumeEach
+import kotlinx.coroutines.experimental.reactive.publish
+import kotlinx.coroutines.experimental.runBlocking
import org.reactivestreams.Publisher
import kotlin.coroutines.experimental.CoroutineContext
fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = publish<T>(context) {
- for (pub in this@merge) { // for each publisher received on the source channel
- launch(this.context) { // launch a child coroutine
- for (x in pub) send(x) // resend all element from this publisher
+ consumeEach { pub -> // for each publisher received on the source channel
+ launch(this.context) { // launch a child coroutine
+ pub.consumeEach { send(it) } // resend all element from this publisher
}
}
}
@@ -45,5 +48,5 @@
}
fun main(args: Array<String>) = runBlocking<Unit> {
- for (x in testPub(context).merge(context)) println(x) // print the whole stream
+ testPub(context).merge(context).consumeEach { println(it) } // print the whole stream
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/test/GuideReactiveTest.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/test/GuideReactiveTest.kt
index 6ce61be..4f26b56 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/test/GuideReactiveTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/test/GuideReactiveTest.kt
@@ -8,52 +8,34 @@
@Test
fun testGuideReactiveBasicExample01() {
test { guide.reactive.basic.example01.main(emptyArray()) }.verifyLines(
- "First three:",
+ "Elements:",
"Begin",
"1",
"2",
"3",
- "Remaining:",
- "4",
- "5",
- "6"
+ "Again:"
)
}
@Test
fun testGuideReactiveBasicExample02() {
test { guide.reactive.basic.example02.main(emptyArray()) }.verifyLines(
- "First three:",
+ "Elements:",
"Begin",
"1",
"2",
"3",
- "Remaining:",
+ "Again:",
"Begin",
"1",
"2",
- "3",
- "4",
- "5",
- "6"
+ "3"
)
}
@Test
fun testGuideReactiveBasicExample03() {
test { guide.reactive.basic.example03.main(emptyArray()) }.verifyLines(
- "First pass 1",
- "First pass 2",
- "First pass 3",
- "Second pass 1",
- "Second pass 2",
- "Second pass 3"
- )
- }
-
- @Test
- fun testGuideReactiveBasicExample04() {
- test { guide.reactive.basic.example04.main(emptyArray()) }.verifyLines(
"OnSubscribe",
"1",
"2",
@@ -63,8 +45,8 @@
}
@Test
- fun testGuideReactiveBasicExample05() {
- test { guide.reactive.basic.example05.main(emptyArray()) }.verifyLines(
+ fun testGuideReactiveBasicExample04() {
+ test { guide.reactive.basic.example04.main(emptyArray()) }.verifyLines(
"OnSubscribe",
"1",
"2",
@@ -76,8 +58,8 @@
}
@Test
- fun testGuideReactiveBasicExample06() {
- test { guide.reactive.basic.example06.main(emptyArray()) }.verifyLines(
+ fun testGuideReactiveBasicExample05() {
+ test { guide.reactive.basic.example05.main(emptyArray()) }.verifyLines(
"Sending 1 ...",
"Sending 2 ...",
"Received 1",
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/IntegrationTest.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/IntegrationTest.kt
index 6a0a0d1..fdd10c1 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/IntegrationTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/IntegrationTest.kt
@@ -52,16 +52,16 @@
@Test
fun testEmpty(): Unit = runBlocking {
- val pub = rxObservable<String>(ctx(context)) {
+ val observable = rxObservable<String>(ctx(context)) {
if (delay) delay(1)
// does not send anything
}
- assertNSE { pub.awaitFirst() }
- assertThat(pub.awaitFirstOrDefault("OK"), IsEqual("OK"))
- assertNSE { pub.awaitLast() }
- assertNSE { pub.awaitSingle() }
+ assertNSE { observable.awaitFirst() }
+ assertThat(observable.awaitFirstOrDefault("OK"), IsEqual("OK"))
+ assertNSE { observable.awaitLast() }
+ assertNSE { observable.awaitSingle() }
var cnt = 0
- for (t in pub) {
+ observable.consumeEach {
cnt++
}
assertThat(cnt, IsEqual(0))
@@ -77,8 +77,8 @@
assertThat(observable.awaitLast(), IsEqual("OK"))
assertThat(observable.awaitSingle(), IsEqual("OK"))
var cnt = 0
- for (t in observable) {
- assertThat(t, IsEqual("OK"))
+ observable.consumeEach {
+ assertThat(it, IsEqual("OK"))
cnt++
}
assertThat(cnt, IsEqual(1))
@@ -104,8 +104,8 @@
private suspend fun checkNumbers(n: Int, observable: Observable<Int>) {
var last = 0
- for (t in observable) {
- assertThat(t, IsEqual(++last))
+ observable.consumeEach {
+ assertThat(it, IsEqual(++last))
}
assertThat(last, IsEqual(n))
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableCompletionStressTest.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableCompletionStressTest.kt
index 2af86ce..bf61ba1 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableCompletionStressTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableCompletionStressTest.kt
@@ -14,13 +14,11 @@
* limitations under the License.
*/
-package kotlinx.coroutines.experimental.rx1
+package kotlinx.coroutines.experimental.rx2
import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.TestBase
import kotlinx.coroutines.experimental.runBlocking
-import kotlinx.coroutines.experimental.rx2.iterator
-import kotlinx.coroutines.experimental.rx2.rxObservable
import kotlinx.coroutines.experimental.withTimeout
import org.junit.Test
import java.util.*
@@ -41,7 +39,7 @@
runBlocking {
withTimeout(5000) {
var received = 0
- for (x in range(CommonPool, 1, count)) {
+ range(CommonPool, 1, count).consumeEach { x ->
received++
if (x != received) error("$x != $received")
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableMultiTest.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableMultiTest.kt
index bf5503a..0f36404 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableMultiTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableMultiTest.kt
@@ -16,13 +16,13 @@
package kotlinx.coroutines.experimental.rx2
+import io.reactivex.Observable
import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.TestBase
import kotlinx.coroutines.experimental.Unconfined
import kotlinx.coroutines.experimental.launch
import org.junit.Assert.assertEquals
import org.junit.Test
-import io.reactivex.Observable
import java.io.IOException
/**
@@ -63,8 +63,7 @@
fun testIteratorResendUnconfined() {
val n = 10_000 * stressTestMultiplier
val observable = rxObservable(Unconfined) {
- for (x in Observable.range(0, n))
- send(x)
+ Observable.range(0, n).consumeEach { send(it) }
}
checkSingleValue(observable.toList()) { list ->
assertEquals((0..n - 1).toList(), list)
@@ -75,8 +74,7 @@
fun testIteratorResendPool() {
val n = 10_000 * stressTestMultiplier
val observable = rxObservable(CommonPool) {
- for (x in Observable.range(0, n))
- send(x)
+ Observable.range(0, n).consumeEach { send(it) }
}
checkSingleValue(observable.toList()) { list ->
assertEquals((0..n - 1).toList(), list)
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableSingleTest.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableSingleTest.kt
index b9b45ff..a20a6be 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableSingleTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableSingleTest.kt
@@ -152,8 +152,7 @@
fun testObservableIteration() {
val observable = rxObservable(CommonPool) {
var result = ""
- for (x in Observable.just("O", "K"))
- result += x
+ Observable.just("O", "K").consumeEach { result += it }
send(result)
}
@@ -166,8 +165,7 @@
fun testObservableIterationFailure() {
val observable = rxObservable(CommonPool) {
try {
- for (x in Observable.error<String>(RuntimeException("OK")))
- fail("Should not be here")
+ Observable.error<String>(RuntimeException("OK")).consumeEach { fail("Should not be here") }
send("Fail")
} catch (e: RuntimeException) {
send(e.message!!)