Introduced `consumeEach` for channels and reactive streams, deprecated iteration on reactive streams
diff --git a/reactive/coroutines-guide-reactive.md b/reactive/coroutines-guide-reactive.md
index 0ebb6ca..18005cb 100644
--- a/reactive/coroutines-guide-reactive.md
+++ b/reactive/coroutines-guide-reactive.md
@@ -103,25 +103,23 @@
```kotlin
fun main(args: Array<String>) = runBlocking<Unit> {
- // create a channel that produces numbers from 1 to 6 with 200ms delays between them
+ // create a channel that produces numbers from 1 to 3 with 200ms delays between them
val source = produce<Int>(context) {
println("Begin") // mark the beginning of this coroutine in output
- for (x in 1..6) {
+ for (x in 1..3) {
delay(200) // wait for 200ms
send(x) // send number x to the channel
}
}
- // print the first 3 elements from this channel
- println("First three:")
- var cnt = 0
- for (x in source) { // iterate over the source to receive elements from it
- println(x)
- if (++cnt >= 3) break // break when 3 elements are printed
+ // print elements from the source
+ println("Elements:")
+ source.consumeEach { // consume elements from it
+ println(it)
}
- // print the remaining elements from this source
- println("Remaining:")
- for (x in source) {
- println(x)
+ // print elements from the source AGAIN
+ println("Again:")
+ source.consumeEach { // consume elements from it
+ println(it)
}
}
```
@@ -131,21 +129,22 @@
This code produces the following output:
```text
-First three:
+Elements:
Begin
1
2
3
-Remaining:
-4
-5
-6
+Again:
```
<!--- TEST -->
-Notice, how "Begin" line was printed just once, because [publish] _coroutine builder_, when it is executed,
-launches one coroutine to produce a stream of elements.
+Notice, how "Begin" line was printed just once, because [produce] _coroutine builder_, when it is executed,
+launches one coroutine to produce a stream of elements. All the produced elements are consumed
+with [ReceiveChannel.consumeEach][consumeEach]
+extension function. There is no way to receive the elements from this
+channel again. The channel is closed when the producer coroutine is over and the attempt to receive
+from it again cannot receive anything.
Let us rewrite this code using [publish] coroutine builder from `kotlinx-coroutines-reactive` module
instead of [produce] from `kotlinx-coroutines-core` module. The code stays the same,
@@ -160,26 +159,24 @@
```kotlin
fun main(args: Array<String>) = runBlocking<Unit> {
- // create a channel that produces numbers from 1 to 6 with 200ms delays between them
+ // create a publisher that produces numbers from 1 to 3 with 200ms delays between them
val source = publish<Int>(context) {
// ^^^^^^^ <--- Difference from the previous examples is here
println("Begin") // mark the beginning of this coroutine in output
- for (x in 1..6) {
+ for (x in 1..3) {
delay(200) // wait for 200ms
send(x) // send number x to the channel
}
}
- // print the first 3 elements from this channel
- println("First three:")
- var cnt = 0
- for (x in source) { // iterate over the source to receive elements from it
- println(x)
- if (++cnt >= 3) break // break when 3 elements are printed
+ // print elements from the source
+ println("Elements:")
+ source.consumeEach { // consume elements from it
+ println(it)
}
- // print the remaining elements from this source
- println("Remaining:")
- for (x in source) {
- println(x)
+ // print elements from the source AGAIN
+ println("Again:")
+ source.consumeEach { // consume elements from it
+ println(it)
}
}
```
@@ -189,19 +186,16 @@
Now the output of this code changes to:
```text
-First three:
+Elements:
Begin
1
2
3
-Remaining:
+Again:
Begin
1
2
3
-4
-5
-6
```
<!--- TEST -->
@@ -212,44 +206,11 @@
a different stream of elements, depending on how the corresponding implementation of `Publisher` works.
The [publish] coroutine builder, that is used in the above example, launches a fresh coroutine on each subscription.
-An iteration over an instance of `Publisher` with `for (x in source)` statement
-opens the channel to this publisher, creating a fresh subscription.
-We have two `for (x in source)` statements in this code and that is why we see "Begin" printed twice.
+Every [Publisher.consumeEach][org.reactivestreams.Publisher.consumeEach] invocation creates a fresh subscription.
+We have two of them in this code and that is why we see "Begin" printed twice.
In Rx lingo this is called a _cold_ publisher. Many standard Rx operators produce cold streams, too. We can iterate
-over them from a coroutine, and every iteration produces the same stream of elements as the following
-example shows:
-
-<!--- INCLUDE
-import io.reactivex.*
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
--->
-
-```kotlin
-fun main(args: Array<String>) = runBlocking<Unit> {
- val source = Flowable.range(1, 3) // Rx 2.x operator to produce a range of integers
- // iterate over the source once
- for (x in source) println("First pass $x")
- // iterate over the source again
- for (x in source) println("Second pass $x")
-}
-```
-
-> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-03.kt)
-
-The output is:
-
-```text
-First pass 1
-First pass 2
-First pass 3
-Second pass 1
-Second pass 2
-Second pass 3
-```
-
-<!--- TEST -->
+over them from a coroutine, and every subscription produces the same stream of elements.
> Note, that we can replicate the same behaviour that we saw with channels by using Rx
[publish](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#publish())
@@ -258,22 +219,10 @@
### Subscription and cancellation
-An example in the previous section contains this snippet:
-
-```kotlin
-var cnt = 0
-for (x in source) { // iterate over the source to receive elements from it
- println(x)
- if (++cnt >= 3) break // break when 3 elements are printed
-}
-```
-
-<!--- CLEAR -->
-
-It is perfectly fine code for receiving the first three elements from a channel, but it should not be generally used
-with a `Publisher` or with a similar reactive stream. When `source` is a `Publisher`, the
-`for (x in source)` statement is a shortcut for `for (x in source.open())`. Let us take a closer look at
-what [Publisher.open][org.reactivestreams.Publisher.open] does and how it can be used:
+An example in the previous section uses `source.consumeEach { ... }` snippet to open a subscription
+and receive all the elements from it. If we need more control on how what to do with
+the elements that are being received from the channel, we can use [Publisher.open][org.reactivestreams.Publisher.open]
+as shown in the following example:
<!--- INCLUDE
import io.reactivex.*
@@ -287,16 +236,17 @@
.doOnSubscribe { println("OnSubscribe") } // provide some insight
.doFinally { println("Finally") } // ... into what's going on
var cnt = 0
- val channel = source.open() // open channel to the source
- for (x in channel) { // iterate over the channel to receive elements from it
- println(x)
- if (++cnt >= 3) break // break when 3 elements are printed
+ source.open().use { channel -> // open channel to the source
+ for (x in channel) { // iterate over the channel to receive elements from it
+ println(x)
+ if (++cnt >= 3) break // break when 3 elements are printed
+ }
+ // `use` will close the channel when this block of code is complete
}
- channel.close() // close the channel
}
```
-> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-04.kt)
+> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-03.kt)
It produces the following output:
@@ -310,13 +260,16 @@
<!--- TEST -->
-Using an explicit `open` we can [close][SubscriptionReceiveChannel.close] the corresponding
-subscription and unsubscribe from the source. The installed
+With an explicit `open` we should [close][SubscriptionReceiveChannel.close] the corresponding
+subscription to unsubscribe from the source. However, instead of invoking `close` explicitly,
+this code relies on [use](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.io/use.html)
+function from Kotlin's standard library.
+The installed
[doFinally](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#doFinally(io.reactivex.functions.Action))
-listener prints "Finally" to confirm that this is actually what is happening.
+listener prints "Finally" to confirm that the subscription is actually being closed.
We do not need to use an explicit `close` if iteration is performed over all the items that are emitted
-by the publisher:
+by the publisher, because it is being closed automatically by `consumeEach`:
<!--- INCLUDE
import io.reactivex.*
@@ -330,11 +283,11 @@
.doOnSubscribe { println("OnSubscribe") } // provide some insight
.doFinally { println("Finally") } // ... into what's going on
// iterate over the source fully
- for (x in source) println(x)
+ source.consumeEach { println(it) }
}
```
-> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-05.kt)
+> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-04.kt)
We get the following output:
@@ -352,7 +305,7 @@
Notice, how "Finally" is printed before the last element "5". It happens because our `main` function in this
example is a coroutine that we start with [runBlocking] coroutine builder.
-Our main coroutine receives on the channel using `for (x in source)` statement.
+Our main coroutine receives on the channel using `source.consumeEach { ... }` expression.
The main coroutine is _suspended_ while it waits for the source to emit an item.
When the last item is emitted by `Flowable.range(1, 5)` it
_resumes_ the main coroutine, which gets dispatched onto the main thread to print this
@@ -403,7 +356,7 @@
}
```
-> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-06.kt)
+> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-05.kt)
The output of this code nicely illustrates how backpressure works with coroutines:
@@ -470,7 +423,7 @@
```kotlin
fun main(args: Array<String>) = runBlocking<Unit> {
- for (x in range(CommonPool, 1, 5)) println(x)
+ range(CommonPool, 1, 5).consumeEach { println(it) }
}
```
@@ -505,13 +458,14 @@
```kotlin
fun <T, R> Publisher<T>.fusedFilterMap(
- context: CoroutineContext, // the context to execute this coroutine in
- predicate: (T) -> Boolean, // the filter predicate
- mapper: (T) -> R // the mapper function
+ context: CoroutineContext, // the context to execute this coroutine in
+ predicate: (T) -> Boolean, // the filter predicate
+ mapper: (T) -> R // the mapper function
) = publish<R>(context) {
- for (x in this@fusedFilterMap) // iterate of the source stream
- if (predicate(x)) // filter part
- send(mapper(x)) // map part
+ consumeEach { // consume the source stream
+ if (predicate(it)) // filter part
+ send(mapper(it)) // map part
+ }
}
```
@@ -527,9 +481,9 @@
```kotlin
fun main(args: Array<String>) = runBlocking<Unit> {
- val result = range(context, 1, 5)
+ range(context, 1, 5)
.fusedFilterMap(context, { it % 2 == 0}, { "$it is even" })
- for (x in result) println(x) // print all strings from result
+ .consumeEach { println(it) } // print all the resulting strings
}
```
@@ -563,10 +517,10 @@
```kotlin
fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = publish<T>(context) {
- this@takeUntil.open().use { thisChannel -> // open channel to Publisher<T>
- other.open().use { otherChannel -> // open channel to Publisher<U>
+ this@takeUntil.open().use { thisChannel -> // explicitly open channel to Publisher<T>
+ other.open().use { otherChannel -> // explicitly open channel to Publisher<U>
whileSelect {
- otherChannel.onReceive { false } // bail out on any received element from `other`
+ otherChannel.onReceive { false } // bail out on any received element from `other`
thisChannel.onReceive { send(it); true } // resend element from this channel and continue
}
}
@@ -597,9 +551,9 @@
```kotlin
fun main(args: Array<String>) = runBlocking<Unit> {
- val slowNums = rangeWithInterval(context, 200, 1, 10) // numbers with 200ms interval
- val stop = rangeWithInterval(context, 500, 1, 10) // the first one after 500ms
- for (x in slowNums.takeUntil(context, stop)) println(x) // let's test it
+ val slowNums = rangeWithInterval(context, 200, 1, 10) // numbers with 200ms interval
+ val stop = rangeWithInterval(context, 500, 1, 10) // the first one after 500ms
+ slowNums.takeUntil(context, stop).consumeEach { println(it) } // let's test it
}
```
@@ -631,9 +585,9 @@
```kotlin
fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = publish<T>(context) {
- for (pub in this@merge) { // for each publisher received on the source channel
- launch(this.context) { // launch a child coroutine
- for (x in pub) send(x) // resend all element from this publisher
+ consumeEach { pub -> // for each publisher received on the source channel
+ launch(this.context) { // launch a child coroutine
+ pub.consumeEach { send(it) } // resend all element from this publisher
}
}
}
@@ -671,7 +625,7 @@
```kotlin
fun main(args: Array<String>) = runBlocking<Unit> {
- for (x in testPub(context).merge(context)) println(x) // print the whole stream
+ testPub(context).merge(context).consumeEach { println(it) } // print the whole stream
}
```
@@ -718,9 +672,8 @@
BiFunction { x, _ -> x })
fun main(args: Array<String>) {
- rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3).subscribe { x ->
- println("$x on thread ${Thread.currentThread().name}")
- }
+ rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
+ .subscribe { println("$it on thread ${Thread.currentThread().name}") }
Thread.sleep(1000)
}
```
@@ -762,9 +715,7 @@
fun main(args: Array<String>) {
Flowable.fromPublisher(rangeWithInterval(CommonPool, 100, 1, 3))
- .subscribe { x ->
- println("$x on thread ${Thread.currentThread().name}")
- }
+ .subscribe { println("$it on thread ${Thread.currentThread().name}") }
Thread.sleep(1000)
}
```
@@ -815,9 +766,7 @@
fun main(args: Array<String>) {
Flowable.fromPublisher(rangeWithInterval(CommonPool, 100, 1, 3))
.observeOn(Schedulers.computation()) // <-- THIS LINE IS ADDED
- .subscribe { x ->
- println("$x on thread ${Thread.currentThread().name}")
- }
+ .subscribe { println("$it on thread ${Thread.currentThread().name}") }
Thread.sleep(1000)
}
```
@@ -857,8 +806,8 @@
BiFunction { x, _ -> x })
fun main(args: Array<String>) = runBlocking<Unit> {
- for (x in rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3))
- println("$x on thread ${Thread.currentThread().name}")
+ rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
+ .consumeEach { println("$it on thread ${Thread.currentThread().name}") }
}
```
@@ -903,8 +852,8 @@
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch(Unconfined) { // launch new coroutine in Unconfined context (without its own thread pool)
- for (x in rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3))
- println("$x on thread ${Thread.currentThread().name}")
+ rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
+ .consumeEach { println("$it on thread ${Thread.currentThread().name}") }
}
job.join() // wait for our coroutine to complete
}
@@ -925,7 +874,7 @@
Note, that [Unconfined] context shall be used with care. It may improve the overall performance on certain tests,
due to the increased stack-locality of operations and less scheduling overhead, but it also produces deeper stacks
-and makes it harder to reason about asynchonity of the code that is using it.
+and makes it harder to reason about asynchronicity of the code that is using it.
If a coroutine sends an element to a channel, then the thread that invoked the
[send][SendChannel.send] may start executing the code of a coroutine with [Unconfined] dispatcher.
@@ -949,6 +898,7 @@
[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/index.html
[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/receive.html
[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
+[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/consume-each.html
[ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/index.html
[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/send.html
<!--- INDEX kotlinx.coroutines.experimental.selects -->
@@ -958,6 +908,7 @@
<!--- DOCS_ROOT reactive/kotlinx-coroutines-reactive/target/dokka/kotlinx-coroutines-reactive -->
<!--- INDEX kotlinx.coroutines.experimental.reactive -->
[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/publish.html
+[org.reactivestreams.Publisher.consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/consume-each.html
[org.reactivestreams.Publisher.open]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/open.html
[SubscriptionReceiveChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/-subscription-receive-channel/close.html
<!--- SITE_ROOT https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2 -->
diff --git a/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Channel.kt b/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Channel.kt
index c4dba88..ad5a0a2 100644
--- a/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Channel.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Channel.kt
@@ -50,8 +50,24 @@
* This is a shortcut for `open().iterator()`. See [open] if you need an ability to manually
* unsubscribe from the observable.
*/
+
+@Suppress("DeprecatedCallableAddReplaceWith")
+@Deprecated(message =
+ "This iteration operator for `for (x in source) { ... }` loop is deprecated, " +
+ "because it leaves code vulnerable to leaving unclosed subscriptions on exception. " +
+ "Use `source.consumeEach { x -> ... }`.")
public operator fun <T> Publisher<T>.iterator() = open().iterator()
+/**
+ * Subscribes to this [Publisher] and performs the specified action for each received element.
+ */
+// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
+public suspend fun <T> Publisher<T>.consumeEach(action: suspend (T) -> Unit) {
+ open().use { channel ->
+ for (x in channel) action(x)
+ }
+}
+
private class SubscriptionChannel<T> : LinkedListChannel<T>(), SubscriptionReceiveChannel<T>, Subscriber<T> {
@Volatile
@JvmField
diff --git a/reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/IntegrationTest.kt b/reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/IntegrationTest.kt
index c5d8ca8..f80fe1c 100644
--- a/reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/IntegrationTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/IntegrationTest.kt
@@ -61,9 +61,7 @@
assertNSE { pub.awaitLast() }
assertNSE { pub.awaitSingle() }
var cnt = 0
- for (t in pub) {
- cnt++
- }
+ pub.consumeEach { cnt++ }
assertThat(cnt, IsEqual(0))
}
@@ -78,8 +76,8 @@
assertThat(pub.awaitLast(), IsEqual("OK"))
assertThat(pub.awaitSingle(), IsEqual("OK"))
var cnt = 0
- for (t in pub) {
- assertThat(t, IsEqual("OK"))
+ pub.consumeEach {
+ assertThat(it, IsEqual("OK"))
cnt++
}
assertThat(cnt, IsEqual(1))
@@ -106,8 +104,8 @@
private suspend fun checkNumbers(n: Int, pub: Publisher<Int>) {
var last = 0
- for (t in pub) {
- assertThat(t, IsEqual(++last))
+ pub.consumeEach {
+ assertThat(it, IsEqual(++last))
}
assertThat(last, IsEqual(n))
}
diff --git a/reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/PublisherCompletionStressTest.kt b/reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/PublisherCompletionStressTest.kt
index a5948b9..a1f5fa1 100644
--- a/reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/PublisherCompletionStressTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/PublisherCompletionStressTest.kt
@@ -39,7 +39,7 @@
runBlocking {
withTimeout(5000) {
var received = 0
- for (x in range(CommonPool, 1, count)) {
+ range(CommonPool, 1, count).consumeEach { x ->
received++
if (x != received) error("$x != $received")
}
diff --git a/reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/PublisherMultiTest.kt b/reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/PublisherMultiTest.kt
index d3aa13b..954aae1 100644
--- a/reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/PublisherMultiTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/test/kotlin/kotlinx/coroutines/experimental/reactive/PublisherMultiTest.kt
@@ -43,8 +43,8 @@
jobs.forEach { it.join() }
}
val resultSet = mutableSetOf<Int>()
- for (t in observable) {
- assertTrue(resultSet.add(t))
+ observable.consumeEach {
+ assertTrue(resultSet.add(it))
}
assertThat(resultSet.size, IsEqual(n))
}
diff --git a/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxChannel.kt b/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxChannel.kt
index 6921be3..06bd13e 100644
--- a/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxChannel.kt
+++ b/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxChannel.kt
@@ -53,8 +53,23 @@
* This is a shortcut for `open().iterator()`. See [open] if you need an ability to manually
* unsubscribe from the observable.
*/
+@Suppress("DeprecatedCallableAddReplaceWith")
+@Deprecated(message =
+"This iteration operator for `for (x in source) { ... }` loop is deprecated, " +
+ "because it leaves code vulnerable to leaving unclosed subscriptions on exception. " +
+ "Use `source.consumeEach { x -> ... }`.")
public operator fun <T> Observable<T>.iterator() = open().iterator()
+/**
+ * Subscribes to this [Observable] and performs the specified action for each received element.
+ */
+// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
+public suspend fun <T> Observable<T>.consumeEach(action: suspend (T) -> Unit) {
+ open().use { channel ->
+ for (x in channel) action(x)
+ }
+}
+
private class SubscriptionChannel<T> : LinkedListChannel<T>(), SubscriptionReceiveChannel<T> {
@JvmField
val subscriber: ChannelSubscriber = ChannelSubscriber()
diff --git a/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/IntegrationTest.kt b/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/IntegrationTest.kt
index 491250a..410c38d 100644
--- a/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/IntegrationTest.kt
+++ b/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/IntegrationTest.kt
@@ -52,16 +52,16 @@
@Test
fun testEmpty(): Unit = runBlocking {
- val pub = rxObservable<String>(ctx(context)) {
+ val observable = rxObservable<String>(ctx(context)) {
if (delay) delay(1)
// does not send anything
}
- assertNSE { pub.awaitFirst() }
- assertThat(pub.awaitFirstOrDefault("OK"), IsEqual("OK"))
- assertNSE { pub.awaitLast() }
- assertNSE { pub.awaitSingle() }
+ assertNSE { observable.awaitFirst() }
+ assertThat(observable.awaitFirstOrDefault("OK"), IsEqual("OK"))
+ assertNSE { observable.awaitLast() }
+ assertNSE { observable.awaitSingle() }
var cnt = 0
- for (t in pub) {
+ observable.consumeEach {
cnt++
}
assertThat(cnt, IsEqual(0))
@@ -77,8 +77,8 @@
assertThat(observable.awaitLast(), IsEqual("OK"))
assertThat(observable.awaitSingle(), IsEqual("OK"))
var cnt = 0
- for (t in observable) {
- assertThat(t, IsEqual("OK"))
+ observable.consumeEach {
+ assertThat(it, IsEqual("OK"))
cnt++
}
assertThat(cnt, IsEqual(1))
@@ -104,8 +104,8 @@
private suspend fun checkNumbers(n: Int, observable: Observable<Int>) {
var last = 0
- for (t in observable) {
- assertThat(t, IsEqual(++last))
+ observable.consumeEach {
+ assertThat(it, IsEqual(++last))
}
assertThat(last, IsEqual(n))
}
diff --git a/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/ObservableCompletionStressTest.kt b/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/ObservableCompletionStressTest.kt
index acc6479..bd2d915 100644
--- a/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/ObservableCompletionStressTest.kt
+++ b/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/ObservableCompletionStressTest.kt
@@ -39,7 +39,7 @@
runBlocking {
withTimeout(5000) {
var received = 0
- for (x in range(CommonPool, 1, count)) {
+ range(CommonPool, 1, count).consumeEach { x ->
received++
if (x != received) error("$x != $received")
}
diff --git a/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/ObservableMultiTest.kt b/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/ObservableMultiTest.kt
index 5628c32..427b207 100644
--- a/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/ObservableMultiTest.kt
+++ b/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/ObservableMultiTest.kt
@@ -63,8 +63,7 @@
fun testIteratorResendUnconfined() {
val n = 10_000 * stressTestMultiplier
val observable = rxObservable(Unconfined) {
- for (x in Observable.range(0, n))
- send(x)
+ Observable.range(0, n).consumeEach { send(it) }
}
checkSingleValue(observable.toList()) { list ->
assertEquals((0..n - 1).toList(), list)
@@ -75,8 +74,7 @@
fun testIteratorResendPool() {
val n = 10_000 * stressTestMultiplier
val observable = rxObservable(CommonPool) {
- for (x in Observable.range(0, n))
- send(x)
+ Observable.range(0, n).consumeEach { send(it) }
}
checkSingleValue(observable.toList()) { list ->
assertEquals((0..n - 1).toList(), list)
diff --git a/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/ObservableSingleTest.kt b/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/ObservableSingleTest.kt
index 26f1221..873810d 100644
--- a/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/ObservableSingleTest.kt
+++ b/reactive/kotlinx-coroutines-rx1/src/test/kotlin/kotlinx/coroutines/experimental/rx1/ObservableSingleTest.kt
@@ -163,8 +163,7 @@
fun testObservableIteration() {
val observable = rxObservable(CommonPool) {
var result = ""
- for (x in Observable.just("O", "K"))
- result += x
+ Observable.just("O", "K").consumeEach {result += it }
send(result)
}
@@ -177,8 +176,7 @@
fun testObservableIterationFailure() {
val observable = rxObservable(CommonPool) {
try {
- for (x in Observable.error<String>(RuntimeException("OK")))
- fail("Should not be here")
+ Observable.error<String>(RuntimeException("OK")).consumeEach { fail("Should not be here") }
send("Fail")
} catch (e: RuntimeException) {
send(e.message!!)
diff --git a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxChannel.kt b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxChannel.kt
index 5da80bc..e58116c 100644
--- a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxChannel.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxChannel.kt
@@ -25,7 +25,7 @@
import java.io.Closeable
/**
- * Return type for [Observable.open] that can be used to [receive] elements from the
+ * Return type for [ObservableSource.open] that can be used to [receive] elements from the
* subscription and to manually [close] it.
*/
public interface SubscriptionReceiveChannel<out T> : ReceiveChannel<T>, Closeable {
@@ -36,7 +36,7 @@
}
/**
- * Subscribes to this [Observable] and returns a channel to receive elements emitted by it.
+ * Subscribes to this [ObservableSource] and returns a channel to receive elements emitted by it.
*/
public fun <T> ObservableSource<T>.open(): SubscriptionReceiveChannel<T> {
val channel = SubscriptionChannel<T>()
@@ -50,8 +50,23 @@
* This is a shortcut for `open().iterator()`. See [open] if you need an ability to manually
* unsubscribe from the observable.
*/
+@Suppress("DeprecatedCallableAddReplaceWith")
+@Deprecated(message =
+"This iteration operator for `for (x in source) { ... }` loop is deprecated, " +
+ "because it leaves code vulnerable to leaving unclosed subscriptions on exception. " +
+ "Use `source.consumeEach { x -> ... }`.")
public operator fun <T> ObservableSource<T>.iterator() = open().iterator()
+/**
+ * Subscribes to this [ObservableSource] and performs the specified action for each received element.
+ */
+// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
+public suspend fun <T> ObservableSource<T>.consumeEach(action: suspend (T) -> Unit) {
+ open().use { channel ->
+ for (x in channel) action(x)
+ }
+}
+
private class SubscriptionChannel<T> : LinkedListChannel<T>(), SubscriptionReceiveChannel<T>, Observer<T> {
@Volatile
var subscription: Disposable? = null
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-01.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-01.kt
index 8a2acb9..b7423e3 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-01.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-01.kt
@@ -17,28 +17,28 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package guide.reactive.basic.example01
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.channels.*
+import kotlinx.coroutines.experimental.channels.consumeEach
+import kotlinx.coroutines.experimental.channels.produce
+import kotlinx.coroutines.experimental.delay
+import kotlinx.coroutines.experimental.runBlocking
fun main(args: Array<String>) = runBlocking<Unit> {
- // create a channel that produces numbers from 1 to 6 with 200ms delays between them
+ // create a channel that produces numbers from 1 to 3 with 200ms delays between them
val source = produce<Int>(context) {
println("Begin") // mark the beginning of this coroutine in output
- for (x in 1..6) {
+ for (x in 1..3) {
delay(200) // wait for 200ms
send(x) // send number x to the channel
}
}
- // print the first 3 elements from this channel
- println("First three:")
- var cnt = 0
- for (x in source) { // iterate over the source to receive elements from it
- println(x)
- if (++cnt >= 3) break // break when 3 elements are printed
+ // print elements from the source
+ println("Elements:")
+ source.consumeEach { // consume elements from it
+ println(it)
}
- // print the remaining elements from this source
- println("Remaining:")
- for (x in source) {
- println(x)
+ // print elements from the source AGAIN
+ println("Again:")
+ source.consumeEach { // consume elements from it
+ println(it)
}
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-02.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-02.kt
index 5c31b3e..5f9d9d0 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-02.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-02.kt
@@ -17,29 +17,29 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package guide.reactive.basic.example02
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import kotlinx.coroutines.experimental.delay
+import kotlinx.coroutines.experimental.reactive.consumeEach
+import kotlinx.coroutines.experimental.reactive.publish
+import kotlinx.coroutines.experimental.runBlocking
fun main(args: Array<String>) = runBlocking<Unit> {
- // create a channel that produces numbers from 1 to 6 with 200ms delays between them
+ // create a publisher that produces numbers from 1 to 3 with 200ms delays between them
val source = publish<Int>(context) {
// ^^^^^^^ <--- Difference from the previous examples is here
println("Begin") // mark the beginning of this coroutine in output
- for (x in 1..6) {
+ for (x in 1..3) {
delay(200) // wait for 200ms
send(x) // send number x to the channel
}
}
- // print the first 3 elements from this channel
- println("First three:")
- var cnt = 0
- for (x in source) { // iterate over the source to receive elements from it
- println(x)
- if (++cnt >= 3) break // break when 3 elements are printed
+ // print elements from the source
+ println("Elements:")
+ source.consumeEach { // consume elements from it
+ println(it)
}
- // print the remaining elements from this source
- println("Remaining:")
- for (x in source) {
- println(x)
+ // print elements from the source AGAIN
+ println("Again:")
+ source.consumeEach { // consume elements from it
+ println(it)
}
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-03.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-03.kt
index 95c098d..b0f0068 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-03.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-03.kt
@@ -17,14 +17,20 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package guide.reactive.basic.example03
-import io.reactivex.*
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import io.reactivex.Flowable
+import kotlinx.coroutines.experimental.reactive.open
+import kotlinx.coroutines.experimental.runBlocking
fun main(args: Array<String>) = runBlocking<Unit> {
- val source = Flowable.range(1, 3) // Rx 2.x operator to produce a range of integers
- // iterate over the source once
- for (x in source) println("First pass $x")
- // iterate over the source again
- for (x in source) println("Second pass $x")
+ val source = Flowable.range(1, 5) // a range of five numbers
+ .doOnSubscribe { println("OnSubscribe") } // provide some insight
+ .doFinally { println("Finally") } // ... into what's going on
+ var cnt = 0
+ source.open().use { channel -> // open channel to the source
+ for (x in channel) { // iterate over the channel to receive elements from it
+ println(x)
+ if (++cnt >= 3) break // break when 3 elements are printed
+ }
+ // `use` will close the channel when this block of code is complete
+ }
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-04.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-04.kt
index b00da76..e41d8dc 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-04.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-04.kt
@@ -17,19 +17,14 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package guide.reactive.basic.example04
-import io.reactivex.*
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import io.reactivex.Flowable
+import kotlinx.coroutines.experimental.reactive.consumeEach
+import kotlinx.coroutines.experimental.runBlocking
fun main(args: Array<String>) = runBlocking<Unit> {
val source = Flowable.range(1, 5) // a range of five numbers
.doOnSubscribe { println("OnSubscribe") } // provide some insight
.doFinally { println("Finally") } // ... into what's going on
- var cnt = 0
- val channel = source.open() // open channel to the source
- for (x in channel) { // iterate over the channel to receive elements from it
- println(x)
- if (++cnt >= 3) break // break when 3 elements are printed
- }
- channel.close() // close the channel
+ // iterate over the source fully
+ source.consumeEach { println(it) }
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-05.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-05.kt
index 8fe2b5b..23c6e04 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-05.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-05.kt
@@ -17,14 +17,26 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package guide.reactive.basic.example05
-import io.reactivex.*
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import io.reactivex.schedulers.Schedulers
+import kotlinx.coroutines.experimental.delay
+import kotlinx.coroutines.experimental.runBlocking
+import kotlinx.coroutines.experimental.rx2.rxFlowable
-fun main(args: Array<String>) = runBlocking<Unit> {
- val source = Flowable.range(1, 5) // a range of five numbers
- .doOnSubscribe { println("OnSubscribe") } // provide some insight
- .doFinally { println("Finally") } // ... into what's going on
- // iterate over the source fully
- for (x in source) println(x)
+fun main(args: Array<String>) = runBlocking<Unit> {
+ // coroutine -- fast producer of elements in the context of the main thread
+ val source = rxFlowable(context) {
+ for (x in 1..5) {
+ println("Sending $x ...")
+ send(x) // this is a suspending function
+ }
+ }
+ // subscribe on another thread with a slow subscriber using Rx
+ source
+ .observeOn(Schedulers.io(), false, 1) // specify buffer size of 1 item
+ .doOnComplete { println("Complete") }
+ .subscribe { x ->
+ println("Received $x")
+ Thread.sleep(200) // 200 ms to process each item
+ }
+ delay(2000) // suspend main thread for couple of seconds
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-06.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-06.kt
deleted file mode 100644
index 56e75da..0000000
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-06.kt
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2016-2017 JetBrains s.r.o.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
-package guide.reactive.basic.example06
-
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.rx2.rxFlowable
-import io.reactivex.schedulers.Schedulers
-
-fun main(args: Array<String>) = runBlocking<Unit> {
- // coroutine -- fast producer of elements in the context of the main thread
- val source = rxFlowable(context) {
- for (x in 1..5) {
- println("Sending $x ...")
- send(x) // this is a suspending function
- }
- }
- // subscribe on another thread with a slow subscriber using Rx
- source
- .observeOn(Schedulers.io(), false, 1) // specify buffer size of 1 item
- .doOnComplete { println("Complete") }
- .subscribe { x ->
- println("Received $x")
- Thread.sleep(200) // 200 ms to process each item
- }
- delay(2000) // suspend main thread for couple of seconds
-}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-01.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-01.kt
index 02c94c4..adaede0 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-01.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-01.kt
@@ -17,7 +17,8 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package guide.reactive.context.example01
-import io.reactivex.*
+import io.reactivex.Flowable
+import io.reactivex.Scheduler
import io.reactivex.functions.BiFunction
import io.reactivex.schedulers.Schedulers
import java.util.concurrent.TimeUnit
@@ -29,8 +30,7 @@
BiFunction { x, _ -> x })
fun main(args: Array<String>) {
- rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3).subscribe { x ->
- println("$x on thread ${Thread.currentThread().name}")
- }
+ rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
+ .subscribe { println("$it on thread ${Thread.currentThread().name}") }
Thread.sleep(1000)
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-02.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-02.kt
index b26ee52..4e551a5 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-02.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-02.kt
@@ -17,9 +17,10 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package guide.reactive.context.example02
-import io.reactivex.*
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import io.reactivex.Flowable
+import kotlinx.coroutines.experimental.CommonPool
+import kotlinx.coroutines.experimental.delay
+import kotlinx.coroutines.experimental.reactive.publish
import kotlin.coroutines.experimental.CoroutineContext
fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
@@ -31,8 +32,6 @@
fun main(args: Array<String>) {
Flowable.fromPublisher(rangeWithInterval(CommonPool, 100, 1, 3))
- .subscribe { x ->
- println("$x on thread ${Thread.currentThread().name}")
- }
+ .subscribe { println("$it on thread ${Thread.currentThread().name}") }
Thread.sleep(1000)
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-03.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-03.kt
index 6534fa9..20ff91c 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-03.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-03.kt
@@ -17,10 +17,11 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package guide.reactive.context.example03
-import io.reactivex.*
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import io.reactivex.Flowable
import io.reactivex.schedulers.Schedulers
+import kotlinx.coroutines.experimental.CommonPool
+import kotlinx.coroutines.experimental.delay
+import kotlinx.coroutines.experimental.reactive.publish
import kotlin.coroutines.experimental.CoroutineContext
fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
@@ -33,8 +34,6 @@
fun main(args: Array<String>) {
Flowable.fromPublisher(rangeWithInterval(CommonPool, 100, 1, 3))
.observeOn(Schedulers.computation()) // <-- THIS LINE IS ADDED
- .subscribe { x ->
- println("$x on thread ${Thread.currentThread().name}")
- }
+ .subscribe { println("$it on thread ${Thread.currentThread().name}") }
Thread.sleep(1000)
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-04.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-04.kt
index 91e72dc..0f55479 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-04.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-04.kt
@@ -17,11 +17,12 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package guide.reactive.context.example04
-import io.reactivex.*
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import io.reactivex.Flowable
+import io.reactivex.Scheduler
import io.reactivex.functions.BiFunction
import io.reactivex.schedulers.Schedulers
+import kotlinx.coroutines.experimental.reactive.consumeEach
+import kotlinx.coroutines.experimental.runBlocking
import java.util.concurrent.TimeUnit
fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
@@ -31,6 +32,6 @@
BiFunction { x, _ -> x })
fun main(args: Array<String>) = runBlocking<Unit> {
- for (x in rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3))
- println("$x on thread ${Thread.currentThread().name}")
+ rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
+ .consumeEach { println("$it on thread ${Thread.currentThread().name}") }
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-05.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-05.kt
index 8b3dbfd..c9e493e 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-05.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-05.kt
@@ -17,11 +17,14 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package guide.reactive.context.example05
-import io.reactivex.*
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import io.reactivex.Flowable
+import io.reactivex.Scheduler
import io.reactivex.functions.BiFunction
import io.reactivex.schedulers.Schedulers
+import kotlinx.coroutines.experimental.Unconfined
+import kotlinx.coroutines.experimental.launch
+import kotlinx.coroutines.experimental.reactive.consumeEach
+import kotlinx.coroutines.experimental.runBlocking
import java.util.concurrent.TimeUnit
fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
@@ -32,8 +35,8 @@
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch(Unconfined) { // launch new coroutine in Unconfined context (without its own thread pool)
- for (x in rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3))
- println("$x on thread ${Thread.currentThread().name}")
+ rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
+ .consumeEach { println("$it on thread ${Thread.currentThread().name}") }
}
job.join() // wait for our coroutine to complete
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-01.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-01.kt
index 217a7ef..d4ce0e0 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-01.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-01.kt
@@ -17,8 +17,10 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package guide.reactive.operators.example01
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import kotlinx.coroutines.experimental.CommonPool
+import kotlinx.coroutines.experimental.reactive.consumeEach
+import kotlinx.coroutines.experimental.reactive.publish
+import kotlinx.coroutines.experimental.runBlocking
import kotlin.coroutines.experimental.CoroutineContext
fun range(context: CoroutineContext, start: Int, count: Int) = publish<Int>(context) {
@@ -26,5 +28,5 @@
}
fun main(args: Array<String>) = runBlocking<Unit> {
- for (x in range(CommonPool, 1, 5)) println(x)
+ range(CommonPool, 1, 5).consumeEach { println(it) }
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-02.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-02.kt
index 7cb3484..fc34009 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-02.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-02.kt
@@ -17,19 +17,21 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package guide.reactive.operators.example02
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import kotlinx.coroutines.experimental.reactive.consumeEach
+import kotlinx.coroutines.experimental.reactive.publish
+import kotlinx.coroutines.experimental.runBlocking
import org.reactivestreams.Publisher
import kotlin.coroutines.experimental.CoroutineContext
fun <T, R> Publisher<T>.fusedFilterMap(
- context: CoroutineContext, // the context to execute this coroutine in
- predicate: (T) -> Boolean, // the filter predicate
- mapper: (T) -> R // the mapper function
+ context: CoroutineContext, // the context to execute this coroutine in
+ predicate: (T) -> Boolean, // the filter predicate
+ mapper: (T) -> R // the mapper function
) = publish<R>(context) {
- for (x in this@fusedFilterMap) // iterate of the source stream
- if (predicate(x)) // filter part
- send(mapper(x)) // map part
+ consumeEach { // consume the source stream
+ if (predicate(it)) // filter part
+ send(mapper(it)) // map part
+ }
}
fun range(context: CoroutineContext, start: Int, count: Int) = publish<Int>(context) {
@@ -37,7 +39,7 @@
}
fun main(args: Array<String>) = runBlocking<Unit> {
- val result = range(context, 1, 5)
+ range(context, 1, 5)
.fusedFilterMap(context, { it % 2 == 0}, { "$it is even" })
- for (x in result) println(x) // print all strings from result
+ .consumeEach { println(it) } // print all the resulting strings
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-03.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-03.kt
index 2ece476..bbf97fd 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-03.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-03.kt
@@ -17,17 +17,20 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package guide.reactive.operators.example03
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import kotlinx.coroutines.experimental.delay
+import kotlinx.coroutines.experimental.reactive.consumeEach
+import kotlinx.coroutines.experimental.reactive.open
+import kotlinx.coroutines.experimental.reactive.publish
+import kotlinx.coroutines.experimental.runBlocking
+import kotlinx.coroutines.experimental.selects.whileSelect
import org.reactivestreams.Publisher
import kotlin.coroutines.experimental.CoroutineContext
-import kotlinx.coroutines.experimental.selects.whileSelect
fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = publish<T>(context) {
- this@takeUntil.open().use { thisChannel -> // open channel to Publisher<T>
- other.open().use { otherChannel -> // open channel to Publisher<U>
+ this@takeUntil.open().use { thisChannel -> // explicitly open channel to Publisher<T>
+ other.open().use { otherChannel -> // explicitly open channel to Publisher<U>
whileSelect {
- otherChannel.onReceive { false } // bail out on any received element from `other`
+ otherChannel.onReceive { false } // bail out on any received element from `other`
thisChannel.onReceive { send(it); true } // resend element from this channel and continue
}
}
@@ -42,7 +45,7 @@
}
fun main(args: Array<String>) = runBlocking<Unit> {
- val slowNums = rangeWithInterval(context, 200, 1, 10) // numbers with 200ms interval
- val stop = rangeWithInterval(context, 500, 1, 10) // the first one after 500ms
- for (x in slowNums.takeUntil(context, stop)) println(x) // let's test it
+ val slowNums = rangeWithInterval(context, 200, 1, 10) // numbers with 200ms interval
+ val stop = rangeWithInterval(context, 500, 1, 10) // the first one after 500ms
+ slowNums.takeUntil(context, stop).consumeEach { println(it) } // let's test it
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-04.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-04.kt
index 76eb713..ad3ebe3 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-04.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-04.kt
@@ -17,15 +17,18 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package guide.reactive.operators.example04
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.reactive.*
+import kotlinx.coroutines.experimental.delay
+import kotlinx.coroutines.experimental.launch
+import kotlinx.coroutines.experimental.reactive.consumeEach
+import kotlinx.coroutines.experimental.reactive.publish
+import kotlinx.coroutines.experimental.runBlocking
import org.reactivestreams.Publisher
import kotlin.coroutines.experimental.CoroutineContext
fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = publish<T>(context) {
- for (pub in this@merge) { // for each publisher received on the source channel
- launch(this.context) { // launch a child coroutine
- for (x in pub) send(x) // resend all element from this publisher
+ consumeEach { pub -> // for each publisher received on the source channel
+ launch(this.context) { // launch a child coroutine
+ pub.consumeEach { send(it) } // resend all element from this publisher
}
}
}
@@ -45,5 +48,5 @@
}
fun main(args: Array<String>) = runBlocking<Unit> {
- for (x in testPub(context).merge(context)) println(x) // print the whole stream
+ testPub(context).merge(context).consumeEach { println(it) } // print the whole stream
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/test/GuideReactiveTest.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/test/GuideReactiveTest.kt
index 6ce61be..4f26b56 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/test/GuideReactiveTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/test/GuideReactiveTest.kt
@@ -8,52 +8,34 @@
@Test
fun testGuideReactiveBasicExample01() {
test { guide.reactive.basic.example01.main(emptyArray()) }.verifyLines(
- "First three:",
+ "Elements:",
"Begin",
"1",
"2",
"3",
- "Remaining:",
- "4",
- "5",
- "6"
+ "Again:"
)
}
@Test
fun testGuideReactiveBasicExample02() {
test { guide.reactive.basic.example02.main(emptyArray()) }.verifyLines(
- "First three:",
+ "Elements:",
"Begin",
"1",
"2",
"3",
- "Remaining:",
+ "Again:",
"Begin",
"1",
"2",
- "3",
- "4",
- "5",
- "6"
+ "3"
)
}
@Test
fun testGuideReactiveBasicExample03() {
test { guide.reactive.basic.example03.main(emptyArray()) }.verifyLines(
- "First pass 1",
- "First pass 2",
- "First pass 3",
- "Second pass 1",
- "Second pass 2",
- "Second pass 3"
- )
- }
-
- @Test
- fun testGuideReactiveBasicExample04() {
- test { guide.reactive.basic.example04.main(emptyArray()) }.verifyLines(
"OnSubscribe",
"1",
"2",
@@ -63,8 +45,8 @@
}
@Test
- fun testGuideReactiveBasicExample05() {
- test { guide.reactive.basic.example05.main(emptyArray()) }.verifyLines(
+ fun testGuideReactiveBasicExample04() {
+ test { guide.reactive.basic.example04.main(emptyArray()) }.verifyLines(
"OnSubscribe",
"1",
"2",
@@ -76,8 +58,8 @@
}
@Test
- fun testGuideReactiveBasicExample06() {
- test { guide.reactive.basic.example06.main(emptyArray()) }.verifyLines(
+ fun testGuideReactiveBasicExample05() {
+ test { guide.reactive.basic.example05.main(emptyArray()) }.verifyLines(
"Sending 1 ...",
"Sending 2 ...",
"Received 1",
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/IntegrationTest.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/IntegrationTest.kt
index 6a0a0d1..fdd10c1 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/IntegrationTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/IntegrationTest.kt
@@ -52,16 +52,16 @@
@Test
fun testEmpty(): Unit = runBlocking {
- val pub = rxObservable<String>(ctx(context)) {
+ val observable = rxObservable<String>(ctx(context)) {
if (delay) delay(1)
// does not send anything
}
- assertNSE { pub.awaitFirst() }
- assertThat(pub.awaitFirstOrDefault("OK"), IsEqual("OK"))
- assertNSE { pub.awaitLast() }
- assertNSE { pub.awaitSingle() }
+ assertNSE { observable.awaitFirst() }
+ assertThat(observable.awaitFirstOrDefault("OK"), IsEqual("OK"))
+ assertNSE { observable.awaitLast() }
+ assertNSE { observable.awaitSingle() }
var cnt = 0
- for (t in pub) {
+ observable.consumeEach {
cnt++
}
assertThat(cnt, IsEqual(0))
@@ -77,8 +77,8 @@
assertThat(observable.awaitLast(), IsEqual("OK"))
assertThat(observable.awaitSingle(), IsEqual("OK"))
var cnt = 0
- for (t in observable) {
- assertThat(t, IsEqual("OK"))
+ observable.consumeEach {
+ assertThat(it, IsEqual("OK"))
cnt++
}
assertThat(cnt, IsEqual(1))
@@ -104,8 +104,8 @@
private suspend fun checkNumbers(n: Int, observable: Observable<Int>) {
var last = 0
- for (t in observable) {
- assertThat(t, IsEqual(++last))
+ observable.consumeEach {
+ assertThat(it, IsEqual(++last))
}
assertThat(last, IsEqual(n))
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableCompletionStressTest.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableCompletionStressTest.kt
index 2af86ce..bf61ba1 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableCompletionStressTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableCompletionStressTest.kt
@@ -14,13 +14,11 @@
* limitations under the License.
*/
-package kotlinx.coroutines.experimental.rx1
+package kotlinx.coroutines.experimental.rx2
import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.TestBase
import kotlinx.coroutines.experimental.runBlocking
-import kotlinx.coroutines.experimental.rx2.iterator
-import kotlinx.coroutines.experimental.rx2.rxObservable
import kotlinx.coroutines.experimental.withTimeout
import org.junit.Test
import java.util.*
@@ -41,7 +39,7 @@
runBlocking {
withTimeout(5000) {
var received = 0
- for (x in range(CommonPool, 1, count)) {
+ range(CommonPool, 1, count).consumeEach { x ->
received++
if (x != received) error("$x != $received")
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableMultiTest.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableMultiTest.kt
index bf5503a..0f36404 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableMultiTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableMultiTest.kt
@@ -16,13 +16,13 @@
package kotlinx.coroutines.experimental.rx2
+import io.reactivex.Observable
import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.TestBase
import kotlinx.coroutines.experimental.Unconfined
import kotlinx.coroutines.experimental.launch
import org.junit.Assert.assertEquals
import org.junit.Test
-import io.reactivex.Observable
import java.io.IOException
/**
@@ -63,8 +63,7 @@
fun testIteratorResendUnconfined() {
val n = 10_000 * stressTestMultiplier
val observable = rxObservable(Unconfined) {
- for (x in Observable.range(0, n))
- send(x)
+ Observable.range(0, n).consumeEach { send(it) }
}
checkSingleValue(observable.toList()) { list ->
assertEquals((0..n - 1).toList(), list)
@@ -75,8 +74,7 @@
fun testIteratorResendPool() {
val n = 10_000 * stressTestMultiplier
val observable = rxObservable(CommonPool) {
- for (x in Observable.range(0, n))
- send(x)
+ Observable.range(0, n).consumeEach { send(it) }
}
checkSingleValue(observable.toList()) { list ->
assertEquals((0..n - 1).toList(), list)
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableSingleTest.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableSingleTest.kt
index b9b45ff..a20a6be 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableSingleTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ObservableSingleTest.kt
@@ -152,8 +152,7 @@
fun testObservableIteration() {
val observable = rxObservable(CommonPool) {
var result = ""
- for (x in Observable.just("O", "K"))
- result += x
+ Observable.just("O", "K").consumeEach { result += it }
send(result)
}
@@ -166,8 +165,7 @@
fun testObservableIterationFailure() {
val observable = rxObservable(CommonPool) {
try {
- for (x in Observable.error<String>(RuntimeException("OK")))
- fail("Should not be here")
+ Observable.error<String>(RuntimeException("OK")).consumeEach { fail("Should not be here") }
send("Fail")
} catch (e: RuntimeException) {
send(e.message!!)