Guide to reactive streams with coroutines
diff --git a/reactive/coroutines-guide-reactive.md b/reactive/coroutines-guide-reactive.md
new file mode 100644
index 0000000..3ee6af8
--- /dev/null
+++ b/reactive/coroutines-guide-reactive.md
@@ -0,0 +1,969 @@
+<!--- INCLUDE .*/example-reactive-([a-z]+)-([0-9]+)\.kt
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
+package guide.reactive.$$1.example$$2
+
+-->
+<!--- KNIT kotlinx-coroutines-rx2/src/test/kotlin/guide/.*\.kt -->
+<!--- TEST_OUT kotlinx-coroutines-rx2/src/test/kotlin/guide/test/GuideReactiveTest.kt
+// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
+package guide.test
+
+import org.junit.Test
+
+class GuideReactiveTest {
+-->
+
+# Guide to reactive streams with coroutines
+
+This guide explains key differences between Kotlin coroutines and reactive streams and shows
+how they can be used together for greater good. Prior familiarity with basic coroutine concepts
+that are covered in [Guide to kotlinx.coroutines](../coroutines-guide.md) is not required,
+but is a big plus. If you are familiar with reactive streams, you may find this guide
+a better introduction into the world of coroutines.
+
+There are several modules in `kotlinx.coroutines` project that are related to reactive streams:
+
+* [kotlinx-coroutines-reactive](kotlinx-coroutines-reactive) -- utilities for [Reactive Streams](http://www.reactive-streams.org)
+* [kotlinx-coroutines-rx1](kotlinx-coroutines-rx1) -- utilities for [RxJava 1.x](https://github.com/ReactiveX/RxJava/tree/1.x)
+* [kotlinx-coroutines-rx2](kotlinx-coroutines-rx2) -- utilities for [RxJava 2.x](https://github.com/ReactiveX/RxJava)
+
+This guide is mostly based on [Reactive Streams](http://www.reactive-streams.org) specification and uses
+its `Publisher` interface with some examples based on [RxJava 2.x](https://github.com/ReactiveX/RxJava),
+which implements reactive streams specification.
+
+You are welcome to clone
+[`kotlinx.coroutines` project](https://github.com/Kotlin/kotlinx.coroutines)
+from GitHub to your workstation in order to
+run all the presented examples. They are contained in
+[reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide](kotlinx-coroutines-rx2/src/test/kotlin/guide)
+directory of the project.
+
+## Table of contents
+
+<!--- TOC -->
+
+* [Differences between reactive streams and channels](#differences-between-reactive-streams-and-channels)
+ * [Basics of iteration](#basics-of-iteration)
+ * [Subscription and cancellation](#subscription-and-cancellation)
+ * [Backpressure](#backpressure)
+* [Operators](#operators)
+ * [Range](#range)
+ * [Fused filter-map hybrid](#fused-filter-map-hybrid)
+ * [Take until](#take-until)
+ * [Merge](#merge)
+* [Coroutine context](#coroutine-context)
+ * [Threads with Rx](#threads-with-rx)
+ * [Threads with coroutines](#threads-with-coroutines)
+ * [Rx observeOn](#rx-observeon)
+ * [Coroutine context to rule them all](#coroutine-context-to-rule-them-all)
+ * [Unconfined context](#unconfined-context)
+
+<!--- END_TOC -->
+
+## Differences between reactive streams and channels
+
+This section outlines key differences between reactive streams and coroutine-based channels.
+
+### Basics of iteration
+
+The [Channel] is somewhat similar concept to the following reactive stream classes:
+
+* Reactive stream [Publisher](https://github.com/reactive-streams/reactive-streams-jvm/blob/master/api/src/main/java/org/reactivestreams/Publisher.java);
+* Rx Java 1.x [Observable](http://reactivex.io/RxJava/javadoc/rx/Observable.html);
+* Rx Java 2.x [Flowable](http://reactivex.io/RxJava/2.x/javadoc/), which implements `Publisher`.
+
+They all describe an asynchronous stream of elements (aka items in Rx), either infinite or finite,
+and all of them support backpressure.
+
+However, the `Channel` always represents a _hot_ stream of items, using Rx terminology. Elements are being sent
+into the channel by producer coroutines and are received from it by consumer coroutines.
+Every [receive][ReceiveChannel.receive] invocation consumes an element from the channel.
+Let us illustrate it with the following example:
+
+<!--- INCLUDE
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.*
+-->
+
+```kotlin
+fun main(args: Array<String>) = runBlocking<Unit> {
+ // create a channel that produces numbers from 1 to 6 with 200ms delays between them
+ val source = produce<Int>(context) {
+ println("Begin") // mark the beginning of this coroutine in output
+ for (x in 1..6) {
+ delay(200) // wait for 200ms
+ send(x) // send number x to the channel
+ }
+ }
+ // print the first 3 elements from this channel
+ println("First three:")
+ var cnt = 0
+ for (x in source) { // iterate over the source to receive elements from it
+ println(x)
+ if (++cnt >= 3) break // break when 3 elements are printed
+ }
+ // print the remaining elements from this source
+ println("Remaining:")
+ for (x in source) {
+ println(x)
+ }
+}
+```
+
+> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-01.kt)
+
+This code produces the following output:
+
+```text
+First three:
+Begin
+1
+2
+3
+Remaining:
+4
+5
+6
+```
+
+<!--- TEST -->
+
+Notice, how "Begin" line was printed just once, because [publish] _coroutine builder_, when it is executed,
+launches one coroutine to produce a stream of elements.
+
+Let us rewrite this code using [publish] coroutine builder from `kotlinx-coroutines-reactive` module
+instead of [produce] from `kotlinx-coroutines-core` module. The code stays the same,
+but where `source` used to have [ReceiveChannel] type, it now has reactive streams
+[Publisher](http://www.reactive-streams.org/reactive-streams-1.0.0-javadoc/org/reactivestreams/Publisher.html)
+type.
+
+<!--- INCLUDE
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.reactive.*
+-->
+
+```kotlin
+fun main(args: Array<String>) = runBlocking<Unit> {
+ // create a channel that produces numbers from 1 to 6 with 200ms delays between them
+ val source = publish<Int>(context) {
+ // ^^^^^^^ <--- Difference from the previous examples is here
+ println("Begin") // mark the beginning of this coroutine in output
+ for (x in 1..6) {
+ delay(200) // wait for 200ms
+ send(x) // send number x to the channel
+ }
+ }
+ // print the first 3 elements from this channel
+ println("First three:")
+ var cnt = 0
+ for (x in source) { // iterate over the source to receive elements from it
+ println(x)
+ if (++cnt >= 3) break // break when 3 elements are printed
+ }
+ // print the remaining elements from this source
+ println("Remaining:")
+ for (x in source) {
+ println(x)
+ }
+}
+```
+
+> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-02.kt)
+
+Now the output of this code changes to:
+
+```text
+First three:
+Begin
+1
+2
+3
+Remaining:
+Begin
+1
+2
+3
+4
+5
+6
+```
+
+<!--- TEST -->
+
+This example highlights the key difference between a reactive stream and a channel. A reactive stream is a higher-order
+functional concept. While the channel _is_ a stream of elements, the reactive stream defines a receipe on how the stream of
+elements is produced. It becomes the actual stream of elements on _subscription_. Each subscriber may receive the same or
+a different stream of elements, depending on how the corresponding implementation of `Publisher` works.
+
+The [publish] coroutine builder, that is used in the above example, launches a fresh coroutine on each subscription.
+An iteration over an instance of `Publisher` with `for (x in source)` statement
+opens the channel to this publisher, creating a fresh subscription.
+We have two `for (x in source)` statements in this code and that is why we see "Begin" printed twice.
+
+In Rx lingo this is called a _cold_ publisher. Many standard Rx operators produce cold streams, too. We can iterate
+over them from a coroutine, and every iteration produces the same stream of elements as the following
+example shows:
+
+<!--- INCLUDE
+import io.reactivex.*
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.reactive.*
+-->
+
+```kotlin
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val source = Flowable.range(1, 3) // Rx 2.x operator to produce a range of integers
+ // iterate over the source once
+ for (x in source) println("First pass $x")
+ // iterate over the source again
+ for (x in source) println("Second pass $x")
+}
+```
+
+> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-03.kt)
+
+The output is:
+
+```text
+First pass 1
+First pass 2
+First pass 3
+Second pass 1
+Second pass 2
+Second pass 3
+```
+
+<!--- TEST -->
+
+> Note, that we can replicate the same behaviour that we saw with channels by using Rx
+[publish](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#publish())
+operator and [connect](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/flowables/ConnectableFlowable.html#connect())
+method with it.
+
+### Subscription and cancellation
+
+An example in the previous section contains this snippet:
+
+```kotlin
+var cnt = 0
+for (x in source) { // iterate over the source to receive elements from it
+ println(x)
+ if (++cnt >= 3) break // break when 3 elements are printed
+}
+```
+
+<!--- CLEAR -->
+
+It is perfectly fine code for receiving the first three elements from a channel, but it should not be generally used
+with a `Publisher` or with a similar reactive stream. When `source` is a `Publisher`, the
+`for (x in source)` statement is a shortcut for `for (x in source.open())`. Let us take a closer look at
+what [Publisher.open][org.reactivestreams.Publisher.open] does and how it can be used:
+
+<!--- INCLUDE
+import io.reactivex.*
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.reactive.*
+-->
+
+```kotlin
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val source = Flowable.range(1, 5) // a range of five numbers
+ .doOnSubscribe { println("OnSubscribe") } // provide some insight
+ .doFinally { println("Finally") } // ... into what's going on
+ var cnt = 0
+ val channel = source.open() // open channel to the source
+ for (x in channel) { // iterate over the channel to receive elements from it
+ println(x)
+ if (++cnt >= 3) break // break when 3 elements are printed
+ }
+ channel.close() // close the channel
+}
+```
+
+> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-04.kt)
+
+It produces the following output:
+
+```text
+OnSubscribe
+1
+2
+3
+Finally
+```
+
+<!--- TEST -->
+
+Using an explicit `open` we can [close][SubscriptionReceiveChannel.close] the corresponding
+subscription and unsubscribe from the source. The installed
+[doFinally](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#doFinally(io.reactivex.functions.Action))
+listener prints "Finally" to confirm that this is actually what is happening.
+
+We do not need to use an explicit `close` if iteration is performed over all the items that are emitted
+by the publisher:
+
+<!--- INCLUDE
+import io.reactivex.*
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.reactive.*
+-->
+
+```kotlin
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val source = Flowable.range(1, 5) // a range of five numbers
+ .doOnSubscribe { println("OnSubscribe") } // provide some insight
+ .doFinally { println("Finally") } // ... into what's going on
+ // iterate over the source fully
+ for (x in source) println(x)
+}
+```
+
+> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-05.kt)
+
+We get the following output:
+
+```text
+OnSubscribe
+1
+2
+3
+4
+Finally
+5
+```
+
+<!--- TEST -->
+
+Notice, how "Finally" is printed before the last element "5". It happens because our `main` function in this
+example is a coroutine that we start with [runBlocking] coroutine builder.
+Our main coroutine receives on the channel using `for (x in source)` statement.
+The main coroutine is _suspended_ while it waits for the source to emit an item.
+When the last item is emitted by `Flowable.range(1, 5)` it
+_resumes_ the main coroutine, which gets dispatched onto the main thread to print this
+ last element at a later point in time, while the source completes and prints "Finally".
+
+### Backpressure
+
+Backpressure is one of the most interesting and complex aspects of reactive streams. Coroutines can
+_suspend_ and they provide a natural answer to handling backpressure.
+
+In Rx Java 2.x a backpressure-capable class is called
+[Flowable](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html).
+In the following example we use [rxFlowable] coroutine builder from `kotlinx-coroutines-rx2` module to define a
+flowable that sends five integers from 1 to 5.
+It prints a message to the output before invocation of
+suspending [send][SendChannel.send] function, so that we can study how it operates.
+
+The integers are generated in [CommonPool], but subscription is shifted
+to another thread using Rx
+[observeOn](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#observeOn(io.reactivex.Scheduler,%20boolean,%20int))
+operator with a buffer of size 1.
+The subscriber is slow. It takes 200 ms to process each item, which is simulated using `Thread.sleep`.
+
+<!--- INCLUDE
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.rx2.rxFlowable
+import io.reactivex.schedulers.Schedulers
+-->
+
+```kotlin
+fun main(args: Array<String>) {
+ // coroutine -- fast producer of elements in common pool
+ val source = rxFlowable(CommonPool) {
+ for (x in 1..5) {
+ println("Sending $x ...")
+ send(x) // this is a suspending function
+ }
+ }
+ // subscribe on another thread with a slow subscriber using Rx
+ source
+ .observeOn(Schedulers.io(), false, 1) // specify buffer size of 1 item
+ .doOnComplete { println("Complete") }
+ .subscribe { x ->
+ println("Received $x")
+ Thread.sleep(200) // 200 ms to process each item
+ }
+ Thread.sleep(2000) // hold on main thread for couple of seconds
+}
+```
+
+> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-06.kt)
+
+The output of this code nicely illustrates how backpressure works with coroutines:
+
+```text
+Sending 1 ...
+Sending 2 ...
+Received 1
+Sending 3 ...
+Received 2
+Sending 4 ...
+Received 3
+Sending 5 ...
+Received 4
+Received 5
+Complete
+```
+
+<!--- TEST -->
+
+We see here how producer coroutine puts the first element in the buffer and is suspended while trying to send another
+one. Only after consumer receives the first item, the sender resumes to produce more.
+
+## Operators
+
+Full-featured reactive stream libraries, like Rx, come with
+[a very large set of operators](http://reactivex.io/documentation/operators.html) to create, transform, combine
+and otherwise process the corresponding streams. Creating your own operators with support for
+back-pressure is [notoriously](http://akarnokd.blogspot.ru/2015/05/pitfalls-of-operator-implementations.html)
+[difficult](https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0).
+
+Coroutines and channels are designed to provide an opposite experience. There are no built-in operators,
+but processing streams of elements is extremely simple and back-pressure is supported automatically
+without you having to explicitly think about it.
+
+This section shows coroutine-based implementation of several reactive stream operators.
+
+### Range
+
+Let's roll out own implementation of
+[range](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#range(int,%20int))
+operator for reactive streams `Publisher` interface. The asynchronous clean-slate implementation of this operator for
+reactive streams is explained in
+[this blog post](http://akarnokd.blogspot.ru/2017/03/java-9-flow-api-asynchronous-integer.html).
+It takes a lot of code.
+Here is the corresponding code with coroutines:
+
+<!--- INCLUDE
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.reactive.*
+import kotlin.coroutines.experimental.CoroutineContext
+-->
+
+```kotlin
+fun range(context: CoroutineContext, start: Int, count: Int) = publish<Int>(context) {
+ for (x in start until start + count) send(x)
+}
+```
+
+In this code `CoroutineContext` is used instead of an `Executor` and all the backpressure aspects are taken care
+of by the coroutines machinery. Note, that this implementation depends only on the small reactive streams library
+that defines `Publisher` interface and its friends.
+
+It is straightforward to use from a coroutine:
+
+```kotlin
+fun main(args: Array<String>) = runBlocking<Unit> {
+ for (x in range(CommonPool, 1, 5)) println(x)
+}
+```
+
+> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-01.kt)
+
+The result of this code is quite expected:
+
+```text
+1
+2
+3
+4
+5
+```
+
+<!--- TEST -->
+
+### Fused filter-map hybrid
+
+Reactive operators like
+[filter](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#filter(io.reactivex.functions.Predicate)) and
+[map](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#map(io.reactivex.functions.Function))
+are trivial to implement with coroutines. For a bit of challenge and showcase, let us combine them
+into the single `fusedFilterMap` operator:
+
+<!--- INCLUDE
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.reactive.*
+import org.reactivestreams.Publisher
+import kotlin.coroutines.experimental.CoroutineContext
+-->
+
+```kotlin
+fun <T, R> Publisher<T>.fusedFilterMap(
+ context: CoroutineContext, // the context to execute this coroutine in
+ predicate: (T) -> Boolean, // the filter predicate
+ mapper: (T) -> R // the mapper function
+) = publish<R>(context) {
+ for (x in this@fusedFilterMap) // iterate of the source stream
+ if (predicate(x)) // filter part
+ send(mapper(x)) // map part
+}
+```
+
+Using `range` from the previous example we can test our `fusedFilterMap`
+by filtering for even numbers and mapping them to strings:
+
+<!--- INCLUDE
+
+fun range(context: CoroutineContext, start: Int, count: Int) = publish<Int>(context) {
+ for (x in start until start + count) send(x)
+}
+-->
+
+```kotlin
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val result = range(context, 1, 5)
+ .fusedFilterMap(context, { it % 2 == 0}, { "$it is even" })
+ for (x in result) println(x) // print all strings from result
+}
+```
+
+> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-02.kt)
+
+It is not hard to see, that the result is going to be:
+
+```text
+2 is even
+4 is even
+```
+
+<!--- TEST -->
+
+### Take until
+
+Let's implement our own version of
+[takeUntil](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#takeUntil(org.reactivestreams.Publisher))
+operator. It is quite a [tricky one](http://akarnokd.blogspot.ru/2015/05/pitfalls-of-operator-implementations.html)
+to implement, because of the need to track and manage subscription to two streams.
+We need to relay all the elements from the source stream until the other stream either completes or
+emits anything. However, we have [select] expression to rescue us in coroutines implementation:
+
+<!--- INCLUDE
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.reactive.*
+import org.reactivestreams.Publisher
+import kotlin.coroutines.experimental.CoroutineContext
+import kotlinx.coroutines.experimental.selects.whileSelect
+-->
+
+```kotlin
+fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = publish<T>(context) {
+ this@takeUntil.open().use { thisChannel -> // open channel to Publisher<T>
+ other.open().use { otherChannel -> // open channel to Publisher<U>
+ whileSelect {
+ otherChannel.onReceive { false } // bail out on any received element from `other`
+ thisChannel.onReceive { send(it); true } // resend element from this channel and continue
+ }
+ }
+ }
+}
+```
+
+This code is using [whileSelect] as a nicer shortcut to `while(select{...}) {}` loop and Kotlin's
+[use](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.io/use.html)
+expression to close the channels on exit, which unsubscribes from the corresponding publishers.
+
+The following hand-written combination of
+[range](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#range(int,%20int)) with
+[interval](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#interval(long,%20java.util.concurrent.TimeUnit,%20io.reactivex.Scheduler))
+is used for testing. It is coded using a `publish` coroutine builder
+(its pure-Rx implementation is shown in later sections):
+
+```kotlin
+fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
+ for (x in start until start + count) {
+ delay(time) // wait before sending each number
+ send(x)
+ }
+}
+```
+
+The following code shows how `takeUntil` works:
+
+```kotlin
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val slowNums = rangeWithInterval(context, 200, 1, 10) // numbers with 200ms interval
+ val stop = rangeWithInterval(context, 500, 1, 10) // the first one after 500ms
+ for (x in slowNums.takeUntil(context, stop)) println(x) // let's test it
+}
+```
+
+> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-03.kt)
+
+Producing
+
+```text
+1
+2
+```
+
+<!--- TEST -->
+
+### Merge
+
+There are always at least two ways for processing multiple streams of data with coroutines. One way involving
+[select] was shown in the previous example. The other way is just to launch multiple coroutines. Let
+us implement
+[merge](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#merge(org.reactivestreams.Publisher))
+operator using the later approach:
+
+<!--- INCLUDE
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.reactive.*
+import org.reactivestreams.Publisher
+import kotlin.coroutines.experimental.CoroutineContext
+-->
+
+```kotlin
+fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = publish<T>(context) {
+ for (pub in this@merge) { // for each publisher received on the source channel
+ launch(this.context) { // launch a child coroutine
+ for (x in pub) send(x) // resend all element from this publisher
+ }
+ }
+}
+```
+
+Notice, the use of `this.context` in the invocation of [launch] coroutine builder. It is used to refer
+to the [CoroutineScope.context] that is provided by [publish] builder. This way, all the coroutines that are
+being launched here are [children](../coroutines-guide.md#children-of-a-coroutine) of the `publish`
+coroutine and will get cancelled when the `publish` coroutine is cancelled or is otherwise completed.
+This implementation completes as soon as the original publisher completes.
+
+For a test, let us start with `rangeWithInterval` function from the previous example and write a
+producer that sends its results twice with some delay:
+
+<!--- INCLUDE
+
+fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
+ for (x in start until start + count) {
+ delay(time) // wait before sending each number
+ send(x)
+ }
+}
+-->
+
+```kotlin
+fun testPub(context: CoroutineContext) = publish<Publisher<Int>>(context) {
+ send(rangeWithInterval(context, 250, 1, 4)) // number 1 at 250ms, 2 at 500ms, 3 at 750ms, 4 at 1000ms
+ delay(100) // wait for 100 ms
+ send(rangeWithInterval(context, 500, 11, 3)) // number 11 at 600ms, 12 at 1100ms, 13 at 1600ms
+ delay(1100) // wait for 1.1s - done in 1.2 sec after start
+}
+```
+
+The test code is to use `merge` on `testPub` and to display the results:
+
+```kotlin
+fun main(args: Array<String>) = runBlocking<Unit> {
+ for (x in testPub(context).merge(context)) println(x) // print the whole stream
+}
+```
+
+> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-04.kt)
+
+And the results should be:
+
+```text
+1
+2
+11
+3
+4
+12
+```
+
+<!--- TEST -->
+
+## Coroutine context
+
+All the example operators that are shown in the previous section have an explicit
+[CoroutineContext](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines.experimental/-coroutine-context/)
+parameter. In Rx world it roughly corresponds to
+a [Scheduler](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Scheduler.html).
+
+### Threads with Rx
+
+The following example shows the basics of threading context management with Rx.
+Here `rangeWithIntervalRx` is an implementation of `rangeWithInterval` function using Rx
+`zip`, `range`, and `interval` operators.
+
+<!--- INCLUDE
+import io.reactivex.*
+import io.reactivex.functions.BiFunction
+import io.reactivex.schedulers.Schedulers
+import java.util.concurrent.TimeUnit
+-->
+
+```kotlin
+fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
+ Flowable.zip(
+ Flowable.range(start, count),
+ Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
+ BiFunction { x, _ -> x })
+
+fun main(args: Array<String>) {
+ rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3).subscribe { x ->
+ println("$x on thread ${Thread.currentThread().name}")
+ }
+ Thread.sleep(1000)
+}
+```
+
+> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-01.kt)
+
+We are explicitly passing the
+[Schedulers.computation()](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/schedulers/Schedulers.html#computation())
+scheduler to our `rangeWithIntervalRx` operator and
+it is going to be executed in Rx computation thread pool. The output is going to be similar to the following one:
+
+```text
+1 on thread RxComputationThreadPool-1
+2 on thread RxComputationThreadPool-1
+3 on thread RxComputationThreadPool-1
+```
+
+<!--- TEST FLEXIBLE_THREAD -->
+
+### Threads with coroutines
+
+In the world of coroutines `Schedulers.computation()` roughly corresponds to [CommonPool],
+so the previous example is similar to the following one:
+
+<!--- INCLUDE
+import io.reactivex.*
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.reactive.*
+import kotlin.coroutines.experimental.CoroutineContext
+-->
+
+```kotlin
+fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
+ for (x in start until start + count) {
+ delay(time) // wait before sending each number
+ send(x)
+ }
+}
+
+fun main(args: Array<String>) {
+ Flowable.fromPublisher(rangeWithInterval(CommonPool, 100, 1, 3))
+ .subscribe { x ->
+ println("$x on thread ${Thread.currentThread().name}")
+ }
+ Thread.sleep(1000)
+}
+```
+
+> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-02.kt)
+
+The produced output is going to be similar to:
+
+```text
+1 on thread ForkJoinPool.commonPool-worker-1
+2 on thread ForkJoinPool.commonPool-worker-1
+3 on thread ForkJoinPool.commonPool-worker-1
+```
+
+<!--- TEST LINES_START -->
+
+Here we've used Rx
+[subscribe](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#subscribe(io.reactivex.functions.Consumer))
+operator that does not have its own scheduler and operates on the same thread that the publisher -- on a `CommonPool`
+in this example.
+
+### Rx observeOn
+
+In Rx you use special operators to modify the threading context for operations in the chain. You
+can find some [good guides](http://tomstechnicalblog.blogspot.ru/2016/02/rxjava-understanding-observeon-and.html)
+about them, if you are not familiar.
+
+For example, there is
+[observeOn](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#observeOn(io.reactivex.Scheduler))
+operator. Let us modify the previous example to observe using `Schedulers.computation()`:
+
+<!--- INCLUDE
+import io.reactivex.*
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.reactive.*
+import io.reactivex.schedulers.Schedulers
+import kotlin.coroutines.experimental.CoroutineContext
+-->
+
+```kotlin
+fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
+ for (x in start until start + count) {
+ delay(time) // wait before sending each number
+ send(x)
+ }
+}
+
+fun main(args: Array<String>) {
+ Flowable.fromPublisher(rangeWithInterval(CommonPool, 100, 1, 3))
+ .observeOn(Schedulers.computation()) // <-- THIS LINE IS ADDED
+ .subscribe { x ->
+ println("$x on thread ${Thread.currentThread().name}")
+ }
+ Thread.sleep(1000)
+}
+```
+
+> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-03.kt)
+
+Here is the difference in output, notice "RxComputationThreadPool":
+
+```text
+1 on thread RxComputationThreadPool-1
+2 on thread RxComputationThreadPool-1
+3 on thread RxComputationThreadPool-1
+```
+
+<!--- TEST FLEXIBLE_THREAD -->
+
+### Coroutine context to rule them all
+
+A coroutine is always working in some context. For example, let us start a coroutine
+in the main thread with [runBlocking] and iterate over the result of the Rx version of `rangeWithIntervalRx` operator,
+instead of using Rx `subscribe` operator:
+
+<!--- INCLUDE
+import io.reactivex.*
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.reactive.*
+import io.reactivex.functions.BiFunction
+import io.reactivex.schedulers.Schedulers
+import java.util.concurrent.TimeUnit
+-->
+
+```kotlin
+fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
+ Flowable.zip(
+ Flowable.range(start, count),
+ Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
+ BiFunction { x, _ -> x })
+
+fun main(args: Array<String>) = runBlocking<Unit> {
+ for (x in rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3))
+ println("$x on thread ${Thread.currentThread().name}")
+}
+```
+
+> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-04.kt)
+
+The resulting messages are going to be printed in the main thread:
+
+```text
+1 on thread main
+2 on thread main
+3 on thread main
+```
+
+<!--- TEST LINES_START -->
+
+### Unconfined context
+
+Most Rx operators do not have any specific thread (scheduler) associated with them and are working
+in whatever thread that they happen to be invoked in. We've seen it on the example of `subscribe` operator
+in the [threads with Rx](#threads-with-rx) section.
+
+In the world of coroutines, [Unconfined] context serves a similar role. Let us modify our previous example,
+but instead of iterating over the source `Flowable` from the `runBlocking` coroutine that is confined
+to the main thread, we launch a new coroutine in `Unconfined` context, while the main coroutine
+simply waits its completion using [Job.join]:
+
+<!--- INCLUDE
+import io.reactivex.*
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.reactive.*
+import io.reactivex.functions.BiFunction
+import io.reactivex.schedulers.Schedulers
+import java.util.concurrent.TimeUnit
+-->
+
+```kotlin
+fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
+ Flowable.zip(
+ Flowable.range(start, count),
+ Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
+ BiFunction { x, _ -> x })
+
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val job = launch(Unconfined) { // launch new coroutine in Unconfined context (without its own thread pool)
+ for (x in rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3))
+ println("$x on thread ${Thread.currentThread().name}")
+ }
+ job.join() // wait for our coroutine to complete
+}
+```
+
+> You can get full code [here](kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-05.kt)
+
+Now, the output shows that the code of the coroutine is executing in the Rx computation thread pool, just
+like our initial example using Rx `subscribe` operator.
+
+```text
+1 on thread RxComputationThreadPool-1
+2 on thread RxComputationThreadPool-1
+3 on thread RxComputationThreadPool-1
+```
+
+<!--- TEST LINES_START -->
+
+Note, that [Unconfined] context shall be used with care. It may improve the overall performance on certain tests,
+due to the increased stack-locality of operations and less scheduling overhead, but it also produces deeper stacks
+and makes it harder to reason about asynchonity of the code that is using it.
+
+If a coroutine sends an element to a channel, then the thread that invoked the
+[send][SendChannel.send] may start executing the code of a coroutine with [Unconfined] dispatcher.
+The original producer coroutine that invoked `send` is paused until the unconfined consumer coroutine hits its next
+suspension point. This is very similar to a lock-step single-threaded `onNext` execution in Rx world in the absense
+of thread-shifting operators. It is a normal default for Rx, because operators are usually doing very small chunks
+of work and you have to combine many operators for a complex processing. However, this is unusual with coroutines,
+where you can have an arbitrary complex processing in a coroutine. Usually, you only need to chain stream-processing
+coroutines for complex pipelines with fan-in and fan-out between multiple worker coroutines.
+
+<!--- SITE_ROOT https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core -->
+<!--- DOCS_ROOT kotlinx-coroutines-core/target/dokka/kotlinx-coroutines-core -->
+<!--- INDEX kotlinx.coroutines.experimental -->
+[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run-blocking.html
+[CommonPool]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-common-pool/index.html
+[launch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/launch.html
+[CoroutineScope.context]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/context.html
+[Unconfined]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-unconfined/index.html
+[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/join.html
+<!--- INDEX kotlinx.coroutines.experimental.channels -->
+[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/index.html
+[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/receive.html
+[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
+[ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/index.html
+[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/send.html
+<!--- INDEX kotlinx.coroutines.experimental.selects -->
+[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/select.html
+[whileSelect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/while-select.html
+<!--- SITE_ROOT https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive -->
+<!--- DOCS_ROOT reactive/kotlinx-coroutines-reactive/target/dokka/kotlinx-coroutines-reactive -->
+<!--- INDEX kotlinx.coroutines.experimental.reactive -->
+[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/publish.html
+[org.reactivestreams.Publisher.open]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/open.html
+[SubscriptionReceiveChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/-subscription-receive-channel/close.html
+<!--- SITE_ROOT https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2 -->
+<!--- DOCS_ROOT reactive/kotlinx-coroutines-rx2/target/dokka/kotlinx-coroutines-rx2 -->
+<!--- INDEX kotlinx.coroutines.experimental.rx2 -->
+[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/rx-flowable.html
+<!--- END -->
+
+
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-01.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-01.kt
new file mode 100644
index 0000000..8a2acb9
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-01.kt
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
+package guide.reactive.basic.example01
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.*
+
+fun main(args: Array<String>) = runBlocking<Unit> {
+ // create a channel that produces numbers from 1 to 6 with 200ms delays between them
+ val source = produce<Int>(context) {
+ println("Begin") // mark the beginning of this coroutine in output
+ for (x in 1..6) {
+ delay(200) // wait for 200ms
+ send(x) // send number x to the channel
+ }
+ }
+ // print the first 3 elements from this channel
+ println("First three:")
+ var cnt = 0
+ for (x in source) { // iterate over the source to receive elements from it
+ println(x)
+ if (++cnt >= 3) break // break when 3 elements are printed
+ }
+ // print the remaining elements from this source
+ println("Remaining:")
+ for (x in source) {
+ println(x)
+ }
+}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-02.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-02.kt
new file mode 100644
index 0000000..5c31b3e
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-02.kt
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
+package guide.reactive.basic.example02
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.reactive.*
+
+fun main(args: Array<String>) = runBlocking<Unit> {
+ // create a channel that produces numbers from 1 to 6 with 200ms delays between them
+ val source = publish<Int>(context) {
+ // ^^^^^^^ <--- Difference from the previous examples is here
+ println("Begin") // mark the beginning of this coroutine in output
+ for (x in 1..6) {
+ delay(200) // wait for 200ms
+ send(x) // send number x to the channel
+ }
+ }
+ // print the first 3 elements from this channel
+ println("First three:")
+ var cnt = 0
+ for (x in source) { // iterate over the source to receive elements from it
+ println(x)
+ if (++cnt >= 3) break // break when 3 elements are printed
+ }
+ // print the remaining elements from this source
+ println("Remaining:")
+ for (x in source) {
+ println(x)
+ }
+}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-03.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-03.kt
new file mode 100644
index 0000000..95c098d
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-03.kt
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
+package guide.reactive.basic.example03
+
+import io.reactivex.*
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.reactive.*
+
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val source = Flowable.range(1, 3) // Rx 2.x operator to produce a range of integers
+ // iterate over the source once
+ for (x in source) println("First pass $x")
+ // iterate over the source again
+ for (x in source) println("Second pass $x")
+}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-04.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-04.kt
new file mode 100644
index 0000000..b00da76
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-04.kt
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
+package guide.reactive.basic.example04
+
+import io.reactivex.*
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.reactive.*
+
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val source = Flowable.range(1, 5) // a range of five numbers
+ .doOnSubscribe { println("OnSubscribe") } // provide some insight
+ .doFinally { println("Finally") } // ... into what's going on
+ var cnt = 0
+ val channel = source.open() // open channel to the source
+ for (x in channel) { // iterate over the channel to receive elements from it
+ println(x)
+ if (++cnt >= 3) break // break when 3 elements are printed
+ }
+ channel.close() // close the channel
+}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-05.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-05.kt
new file mode 100644
index 0000000..8fe2b5b
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-05.kt
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
+package guide.reactive.basic.example05
+
+import io.reactivex.*
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.reactive.*
+
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val source = Flowable.range(1, 5) // a range of five numbers
+ .doOnSubscribe { println("OnSubscribe") } // provide some insight
+ .doFinally { println("Finally") } // ... into what's going on
+ // iterate over the source fully
+ for (x in source) println(x)
+}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-06.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-06.kt
new file mode 100644
index 0000000..d6fd630
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-06.kt
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
+package guide.reactive.basic.example06
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.rx2.rxFlowable
+import io.reactivex.schedulers.Schedulers
+
+fun main(args: Array<String>) {
+ // coroutine -- fast producer of elements in common pool
+ val source = rxFlowable(CommonPool) {
+ for (x in 1..5) {
+ println("Sending $x ...")
+ send(x) // this is a suspending function
+ }
+ }
+ // subscribe on another thread with a slow subscriber using Rx
+ source
+ .observeOn(Schedulers.io(), false, 1) // specify buffer size of 1 item
+ .doOnComplete { println("Complete") }
+ .subscribe { x ->
+ println("Received $x")
+ Thread.sleep(200) // 200 ms to process each item
+ }
+ Thread.sleep(2000) // hold on main thread for couple of seconds
+}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-01.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-01.kt
new file mode 100644
index 0000000..02c94c4
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-01.kt
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
+package guide.reactive.context.example01
+
+import io.reactivex.*
+import io.reactivex.functions.BiFunction
+import io.reactivex.schedulers.Schedulers
+import java.util.concurrent.TimeUnit
+
+fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
+ Flowable.zip(
+ Flowable.range(start, count),
+ Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
+ BiFunction { x, _ -> x })
+
+fun main(args: Array<String>) {
+ rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3).subscribe { x ->
+ println("$x on thread ${Thread.currentThread().name}")
+ }
+ Thread.sleep(1000)
+}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-02.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-02.kt
new file mode 100644
index 0000000..b26ee52
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-02.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
+package guide.reactive.context.example02
+
+import io.reactivex.*
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.reactive.*
+import kotlin.coroutines.experimental.CoroutineContext
+
+fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
+ for (x in start until start + count) {
+ delay(time) // wait before sending each number
+ send(x)
+ }
+}
+
+fun main(args: Array<String>) {
+ Flowable.fromPublisher(rangeWithInterval(CommonPool, 100, 1, 3))
+ .subscribe { x ->
+ println("$x on thread ${Thread.currentThread().name}")
+ }
+ Thread.sleep(1000)
+}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-03.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-03.kt
new file mode 100644
index 0000000..6534fa9
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-03.kt
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
+package guide.reactive.context.example03
+
+import io.reactivex.*
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.reactive.*
+import io.reactivex.schedulers.Schedulers
+import kotlin.coroutines.experimental.CoroutineContext
+
+fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
+ for (x in start until start + count) {
+ delay(time) // wait before sending each number
+ send(x)
+ }
+}
+
+fun main(args: Array<String>) {
+ Flowable.fromPublisher(rangeWithInterval(CommonPool, 100, 1, 3))
+ .observeOn(Schedulers.computation()) // <-- THIS LINE IS ADDED
+ .subscribe { x ->
+ println("$x on thread ${Thread.currentThread().name}")
+ }
+ Thread.sleep(1000)
+}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-04.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-04.kt
new file mode 100644
index 0000000..91e72dc
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-04.kt
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
+package guide.reactive.context.example04
+
+import io.reactivex.*
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.reactive.*
+import io.reactivex.functions.BiFunction
+import io.reactivex.schedulers.Schedulers
+import java.util.concurrent.TimeUnit
+
+fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
+ Flowable.zip(
+ Flowable.range(start, count),
+ Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
+ BiFunction { x, _ -> x })
+
+fun main(args: Array<String>) = runBlocking<Unit> {
+ for (x in rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3))
+ println("$x on thread ${Thread.currentThread().name}")
+}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-05.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-05.kt
new file mode 100644
index 0000000..8b3dbfd
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-context-05.kt
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
+package guide.reactive.context.example05
+
+import io.reactivex.*
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.reactive.*
+import io.reactivex.functions.BiFunction
+import io.reactivex.schedulers.Schedulers
+import java.util.concurrent.TimeUnit
+
+fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
+ Flowable.zip(
+ Flowable.range(start, count),
+ Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
+ BiFunction { x, _ -> x })
+
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val job = launch(Unconfined) { // launch new coroutine in Unconfined context (without its own thread pool)
+ for (x in rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3))
+ println("$x on thread ${Thread.currentThread().name}")
+ }
+ job.join() // wait for our coroutine to complete
+}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-01.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-01.kt
new file mode 100644
index 0000000..217a7ef
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-01.kt
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
+package guide.reactive.operators.example01
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.reactive.*
+import kotlin.coroutines.experimental.CoroutineContext
+
+fun range(context: CoroutineContext, start: Int, count: Int) = publish<Int>(context) {
+ for (x in start until start + count) send(x)
+}
+
+fun main(args: Array<String>) = runBlocking<Unit> {
+ for (x in range(CommonPool, 1, 5)) println(x)
+}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-02.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-02.kt
new file mode 100644
index 0000000..7cb3484
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-02.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
+package guide.reactive.operators.example02
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.reactive.*
+import org.reactivestreams.Publisher
+import kotlin.coroutines.experimental.CoroutineContext
+
+fun <T, R> Publisher<T>.fusedFilterMap(
+ context: CoroutineContext, // the context to execute this coroutine in
+ predicate: (T) -> Boolean, // the filter predicate
+ mapper: (T) -> R // the mapper function
+) = publish<R>(context) {
+ for (x in this@fusedFilterMap) // iterate of the source stream
+ if (predicate(x)) // filter part
+ send(mapper(x)) // map part
+}
+
+fun range(context: CoroutineContext, start: Int, count: Int) = publish<Int>(context) {
+ for (x in start until start + count) send(x)
+}
+
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val result = range(context, 1, 5)
+ .fusedFilterMap(context, { it % 2 == 0}, { "$it is even" })
+ for (x in result) println(x) // print all strings from result
+}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-03.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-03.kt
new file mode 100644
index 0000000..2ece476
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-03.kt
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
+package guide.reactive.operators.example03
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.reactive.*
+import org.reactivestreams.Publisher
+import kotlin.coroutines.experimental.CoroutineContext
+import kotlinx.coroutines.experimental.selects.whileSelect
+
+fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = publish<T>(context) {
+ this@takeUntil.open().use { thisChannel -> // open channel to Publisher<T>
+ other.open().use { otherChannel -> // open channel to Publisher<U>
+ whileSelect {
+ otherChannel.onReceive { false } // bail out on any received element from `other`
+ thisChannel.onReceive { send(it); true } // resend element from this channel and continue
+ }
+ }
+ }
+}
+
+fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
+ for (x in start until start + count) {
+ delay(time) // wait before sending each number
+ send(x)
+ }
+}
+
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val slowNums = rangeWithInterval(context, 200, 1, 10) // numbers with 200ms interval
+ val stop = rangeWithInterval(context, 500, 1, 10) // the first one after 500ms
+ for (x in slowNums.takeUntil(context, stop)) println(x) // let's test it
+}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-04.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-04.kt
new file mode 100644
index 0000000..76eb713
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-operators-04.kt
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
+package guide.reactive.operators.example04
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.reactive.*
+import org.reactivestreams.Publisher
+import kotlin.coroutines.experimental.CoroutineContext
+
+fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = publish<T>(context) {
+ for (pub in this@merge) { // for each publisher received on the source channel
+ launch(this.context) { // launch a child coroutine
+ for (x in pub) send(x) // resend all element from this publisher
+ }
+ }
+}
+
+fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
+ for (x in start until start + count) {
+ delay(time) // wait before sending each number
+ send(x)
+ }
+}
+
+fun testPub(context: CoroutineContext) = publish<Publisher<Int>>(context) {
+ send(rangeWithInterval(context, 250, 1, 4)) // number 1 at 250ms, 2 at 500ms, 3 at 750ms, 4 at 1000ms
+ delay(100) // wait for 100 ms
+ send(rangeWithInterval(context, 500, 11, 3)) // number 11 at 600ms, 12 at 1100ms, 13 at 1600ms
+ delay(1100) // wait for 1.1s - done in 1.2 sec after start
+}
+
+fun main(args: Array<String>) = runBlocking<Unit> {
+ for (x in testPub(context).merge(context)) println(x) // print the whole stream
+}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/test/GuideReactiveTest.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/test/GuideReactiveTest.kt
new file mode 100644
index 0000000..6ce61be
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/test/GuideReactiveTest.kt
@@ -0,0 +1,178 @@
+// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
+package guide.test
+
+import org.junit.Test
+
+class GuideReactiveTest {
+
+ @Test
+ fun testGuideReactiveBasicExample01() {
+ test { guide.reactive.basic.example01.main(emptyArray()) }.verifyLines(
+ "First three:",
+ "Begin",
+ "1",
+ "2",
+ "3",
+ "Remaining:",
+ "4",
+ "5",
+ "6"
+ )
+ }
+
+ @Test
+ fun testGuideReactiveBasicExample02() {
+ test { guide.reactive.basic.example02.main(emptyArray()) }.verifyLines(
+ "First three:",
+ "Begin",
+ "1",
+ "2",
+ "3",
+ "Remaining:",
+ "Begin",
+ "1",
+ "2",
+ "3",
+ "4",
+ "5",
+ "6"
+ )
+ }
+
+ @Test
+ fun testGuideReactiveBasicExample03() {
+ test { guide.reactive.basic.example03.main(emptyArray()) }.verifyLines(
+ "First pass 1",
+ "First pass 2",
+ "First pass 3",
+ "Second pass 1",
+ "Second pass 2",
+ "Second pass 3"
+ )
+ }
+
+ @Test
+ fun testGuideReactiveBasicExample04() {
+ test { guide.reactive.basic.example04.main(emptyArray()) }.verifyLines(
+ "OnSubscribe",
+ "1",
+ "2",
+ "3",
+ "Finally"
+ )
+ }
+
+ @Test
+ fun testGuideReactiveBasicExample05() {
+ test { guide.reactive.basic.example05.main(emptyArray()) }.verifyLines(
+ "OnSubscribe",
+ "1",
+ "2",
+ "3",
+ "4",
+ "Finally",
+ "5"
+ )
+ }
+
+ @Test
+ fun testGuideReactiveBasicExample06() {
+ test { guide.reactive.basic.example06.main(emptyArray()) }.verifyLines(
+ "Sending 1 ...",
+ "Sending 2 ...",
+ "Received 1",
+ "Sending 3 ...",
+ "Received 2",
+ "Sending 4 ...",
+ "Received 3",
+ "Sending 5 ...",
+ "Received 4",
+ "Received 5",
+ "Complete"
+ )
+ }
+
+ @Test
+ fun testGuideReactiveOperatorsExample01() {
+ test { guide.reactive.operators.example01.main(emptyArray()) }.verifyLines(
+ "1",
+ "2",
+ "3",
+ "4",
+ "5"
+ )
+ }
+
+ @Test
+ fun testGuideReactiveOperatorsExample02() {
+ test { guide.reactive.operators.example02.main(emptyArray()) }.verifyLines(
+ "2 is even",
+ "4 is even"
+ )
+ }
+
+ @Test
+ fun testGuideReactiveOperatorsExample03() {
+ test { guide.reactive.operators.example03.main(emptyArray()) }.verifyLines(
+ "1",
+ "2"
+ )
+ }
+
+ @Test
+ fun testGuideReactiveOperatorsExample04() {
+ test { guide.reactive.operators.example04.main(emptyArray()) }.verifyLines(
+ "1",
+ "2",
+ "11",
+ "3",
+ "4",
+ "12"
+ )
+ }
+
+ @Test
+ fun testGuideReactiveContextExample01() {
+ test { guide.reactive.context.example01.main(emptyArray()) }.verifyLinesFlexibleThread(
+ "1 on thread RxComputationThreadPool-1",
+ "2 on thread RxComputationThreadPool-1",
+ "3 on thread RxComputationThreadPool-1"
+ )
+ }
+
+ @Test
+ fun testGuideReactiveContextExample02() {
+ test { guide.reactive.context.example02.main(emptyArray()) }.verifyLinesStart(
+ "1 on thread ForkJoinPool.commonPool-worker-1",
+ "2 on thread ForkJoinPool.commonPool-worker-1",
+ "3 on thread ForkJoinPool.commonPool-worker-1"
+ )
+ }
+
+ @Test
+ fun testGuideReactiveContextExample03() {
+ test { guide.reactive.context.example03.main(emptyArray()) }.verifyLinesFlexibleThread(
+ "1 on thread RxComputationThreadPool-1",
+ "2 on thread RxComputationThreadPool-1",
+ "3 on thread RxComputationThreadPool-1"
+ )
+ }
+
+ @Test
+ fun testGuideReactiveContextExample04() {
+ test { guide.reactive.context.example04.main(emptyArray()) }.verifyLinesStart(
+ "1 on thread main",
+ "2 on thread main",
+ "3 on thread main"
+ )
+ }
+
+ @Test
+ fun testGuideReactiveContextExample05() {
+ test { guide.reactive.context.example05.main(emptyArray()) }.verifyLinesStart(
+ "1 on thread RxComputationThreadPool-1",
+ "2 on thread RxComputationThreadPool-1",
+ "3 on thread RxComputationThreadPool-1"
+ )
+ }
+}