Integration with RxJava3 (#1950)

Fixes #1883

Co-authored-by: Zac Sweers <zsweers@slack-corp.com>
diff --git a/README.md b/README.md
index 62a7dc8..d829dfe 100644
--- a/README.md
+++ b/README.md
@@ -47,6 +47,7 @@
   * Reactive Streams ([Publisher.collect], [Publisher.awaitSingle], [publish], etc), 
   * Flow (JDK 9) (the same interface as for Reactive Streams),
   * RxJava 2.x ([rxFlowable], [rxSingle], etc), and
+  * RxJava 3.x ([rxFlowable], [rxSingle], etc), and
   * Project Reactor ([flux], [mono], etc).
 * [ui](ui/README.md) &mdash; modules that provide coroutine dispatchers for various single-threaded UI libraries:
   * Android, JavaFX, and Swing.
@@ -288,6 +289,8 @@
 <!--- INDEX kotlinx.coroutines.rx2 -->
 [rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html
 [rxSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-single.html
+<!--- MODULE kotlinx-coroutines-rx2 -->
+<!--- INDEX kotlinx.coroutines.rx2 -->
 <!--- MODULE kotlinx-coroutines-reactor -->
 <!--- INDEX kotlinx.coroutines.reactor -->
 [flux]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/flux.html
diff --git a/docs/flow.md b/docs/flow.md
index 705f338..37e491c 100644
--- a/docs/flow.md
+++ b/docs/flow.md
@@ -1786,7 +1786,7 @@
 be Kotlin and suspension friendly and respect structured concurrency. Achieving this goal would be impossible without reactive pioneers and their tremendous work. You can read the complete story in [Reactive Streams and Kotlin Flows](https://medium.com/@elizarov/reactive-streams-and-kotlin-flows-bfd12772cda4) article.
 
 While being different, conceptually, Flow *is* a reactive stream and it is possible to convert it to the reactive (spec and TCK compliant) Publisher and vice versa.
-Such converters are provided by `kotlinx.coroutines` out-of-the-box and can be found in corresponding reactive modules (`kotlinx-coroutines-reactive` for Reactive Streams, `kotlinx-coroutines-reactor` for Project Reactor and `kotlinx-coroutines-rx2` for RxJava2).
+Such converters are provided by `kotlinx.coroutines` out-of-the-box and can be found in corresponding reactive modules (`kotlinx-coroutines-reactive` for Reactive Streams, `kotlinx-coroutines-reactor` for Project Reactor and `kotlinx-coroutines-rx2`/`kotlinx-coroutines-rx3` for RxJava2/RxJava3).
 Integration modules include conversions from and to `Flow`, integration with Reactor's `Context` and suspension-friendly ways to work with various reactive entities.
  
 <!-- stdlib references -->
diff --git a/gradle.properties b/gradle.properties
index 56a1a23..233ad36 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -18,6 +18,7 @@
 reactor_vesion=3.2.5.RELEASE
 reactive_streams_version=1.0.2
 rxjava2_version=2.2.8
+rxjava3_version=3.0.2
 javafx_version=11.0.2
 javafx_plugin_version=0.0.8
 binary_compatibility_validator_version=0.2.2
diff --git a/reactive/README.md b/reactive/README.md
index 8679a2b..35706ac 100644
--- a/reactive/README.md
+++ b/reactive/README.md
@@ -8,3 +8,4 @@
 * [kotlinx-coroutines-reactive](kotlinx-coroutines-reactive/README.md) -- utilities for [Reactive Streams](https://www.reactive-streams.org)
 * [kotlinx-coroutines-reactor](kotlinx-coroutines-reactor/README.md) -- utilities for [Reactor](https://projectreactor.io)
 * [kotlinx-coroutines-rx2](kotlinx-coroutines-rx2/README.md) -- utilities for [RxJava 2.x](https://github.com/ReactiveX/RxJava)
+* [kotlinx-coroutines-rx3](kotlinx-coroutines-rx3/README.md) -- utilities for [RxJava 3.x](https://github.com/ReactiveX/RxJava)
diff --git a/reactive/kotlinx-coroutines-rx2/build.gradle b/reactive/kotlinx-coroutines-rx2/build.gradle
index b583025..6d2c4ab 100644
--- a/reactive/kotlinx-coroutines-rx2/build.gradle
+++ b/reactive/kotlinx-coroutines-rx2/build.gradle
@@ -11,7 +11,7 @@
 
 tasks.withType(dokka.getClass()) {
     externalDocumentationLink {
-        url = new URL('http://reactivex.io/RxJava/javadoc/')
+        url = new URL('http://reactivex.io/RxJava/2.x/javadoc/')
         packageListUrl = projectDir.toPath().resolve("package.list").toUri().toURL()
     }
 }
diff --git a/reactive/kotlinx-coroutines-rx3/README.md b/reactive/kotlinx-coroutines-rx3/README.md
new file mode 100644
index 0000000..3aa73eb
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/README.md
@@ -0,0 +1,84 @@
+# Module kotlinx-coroutines-rx3
+
+Utilities for [RxJava 3.x](https://github.com/ReactiveX/RxJava).
+
+Coroutine builders:
+
+| **Name**        | **Result**                              | **Scope**        | **Description**
+| --------------- | --------------------------------------- | ---------------- | ---------------
+| [rxCompletable] | `Completable`                           | [CoroutineScope] | Cold completable that starts coroutine on subscribe
+| [rxMaybe]       | `Maybe`                                 | [CoroutineScope] | Cold maybe that starts coroutine on subscribe
+| [rxSingle]      | `Single`                                | [CoroutineScope] | Cold single that starts coroutine on subscribe
+| [rxObservable]  | `Observable`                            | [ProducerScope]  | Cold observable that starts coroutine on subscribe
+| [rxFlowable]    | `Flowable`                              | [ProducerScope]  | Cold observable that starts coroutine on subscribe with **backpressure** support 
+
+Integration with [Flow]:
+
+| **Name**                   | **Result**      | **Description**
+| ---------------            | --------------  | ---------------
+| [Flow.asFlowable]          | `Flowable`      | Converts the given flow to a cold Flowable.
+| [Flow.asObservable]        | `Observable`    | Converts the given flow to a cold Observable.
+| [ObservableSource.asFlow]  | `Flow`          | Converts the given cold ObservableSource to flow
+
+Suspending extension functions and suspending iteration:
+
+| **Name** | **Description**
+| -------- | ---------------
+| [CompletableSource.await][io.reactivex.rxjava3.core.CompletableSource.await] | Awaits for completion of the completable value 
+| [MaybeSource.await][io.reactivex.rxjava3.core.MaybeSource.await] | Awaits for the value of the maybe and returns it or null 
+| [MaybeSource.awaitOrDefault][io.reactivex.rxjava3.core.MaybeSource.awaitOrDefault] | Awaits for the value of the maybe and returns it or default 
+| [SingleSource.await][io.reactivex.rxjava3.core.SingleSource.await] | Awaits for completion of the single value and returns it 
+| [ObservableSource.awaitFirst][io.reactivex.rxjava3.core.ObservableSource.awaitFirst] | Awaits for the first value from the given observable
+| [ObservableSource.awaitFirstOrDefault][io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrDefault] | Awaits for the first value from the given observable or default
+| [ObservableSource.awaitFirstOrElse][io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrElse] | Awaits for the first value from the given observable or default from a function
+| [ObservableSource.awaitFirstOrNull][io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrNull] | Awaits for the first value from the given observable or null
+| [ObservableSource.awaitLast][io.reactivex.rxjava3.core.ObservableSource.awaitFirst] | Awaits for the last value from the given observable
+| [ObservableSource.awaitSingle][io.reactivex.rxjava3.core.ObservableSource.awaitSingle] | Awaits for the single value from the given observable
+
+Note that `Flowable` is a subclass of [Reactive Streams](https://www.reactive-streams.org)
+`Publisher` and extensions for it are covered by
+[kotlinx-coroutines-reactive](../kotlinx-coroutines-reactive) module.
+
+Conversion functions:
+
+| **Name** | **Description**
+| -------- | ---------------
+| [Job.asCompletable][kotlinx.coroutines.Job.asCompletable] | Converts job to hot completable
+| [Deferred.asSingle][kotlinx.coroutines.Deferred.asSingle] | Converts deferred value to hot single
+| [Scheduler.asCoroutineDispatcher][io.reactivex.rxjava3.core.Scheduler.asCoroutineDispatcher] | Converts scheduler to [CoroutineDispatcher]
+
+<!--- MODULE kotlinx-coroutines-core -->
+<!--- INDEX kotlinx.coroutines -->
+[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html
+[CoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-dispatcher/index.html
+<!--- INDEX kotlinx.coroutines.channels -->
+[ProducerScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-producer-scope/index.html
+<!--- INDEX kotlinx.coroutines.flow -->
+[Flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html
+<!--- MODULE kotlinx-coroutines-rx3 -->
+<!--- INDEX kotlinx.coroutines.rx3 -->
+[rxCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/rx-completable.html
+[rxMaybe]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/rx-maybe.html
+[rxSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/rx-single.html
+[rxObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/rx-observable.html
+[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/rx-flowable.html
+[Flow.asFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/kotlinx.coroutines.flow.-flow/as-flowable.html
+[Flow.asObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/kotlinx.coroutines.flow.-flow/as-observable.html
+[ObservableSource.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/as-flow.html
+[io.reactivex.rxjava3.core.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-completable-source/await.html
+[io.reactivex.rxjava3.core.MaybeSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-maybe-source/await.html
+[io.reactivex.rxjava3.core.MaybeSource.awaitOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-maybe-source/await-or-default.html
+[io.reactivex.rxjava3.core.SingleSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-single-source/await.html
+[io.reactivex.rxjava3.core.ObservableSource.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/await-first.html
+[io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/await-first-or-default.html
+[io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrElse]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/await-first-or-else.html
+[io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/await-first-or-null.html
+[io.reactivex.rxjava3.core.ObservableSource.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/await-single.html
+[kotlinx.coroutines.Job.asCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/kotlinx.coroutines.-job/as-completable.html
+[kotlinx.coroutines.Deferred.asSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/kotlinx.coroutines.-deferred/as-single.html
+[io.reactivex.rxjava3.core.Scheduler.asCoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-scheduler/as-coroutine-dispatcher.html
+<!--- END -->
+
+# Package kotlinx.coroutines.rx3
+
+Utilities for [RxJava 3.x](https://github.com/ReactiveX/RxJava).
diff --git a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api
new file mode 100644
index 0000000..27c3d3d
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api
@@ -0,0 +1,70 @@
+public final class kotlinx/coroutines/rx3/RxAwaitKt {
+	public static final fun await (Lio/reactivex/rxjava3/core/CompletableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+	public static final fun await (Lio/reactivex/rxjava3/core/MaybeSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+	public static final fun await (Lio/reactivex/rxjava3/core/SingleSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+	public static final fun awaitFirst (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+	public static final fun awaitFirstOrDefault (Lio/reactivex/rxjava3/core/ObservableSource;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+	public static final fun awaitFirstOrElse (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+	public static final fun awaitFirstOrNull (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+	public static final fun awaitLast (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+	public static final fun awaitOrDefault (Lio/reactivex/rxjava3/core/MaybeSource;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+	public static final fun awaitSingle (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+}
+
+public final class kotlinx/coroutines/rx3/RxChannelKt {
+	public static final fun collect (Lio/reactivex/rxjava3/core/MaybeSource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+	public static final fun collect (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+	public static final fun openSubscription (Lio/reactivex/rxjava3/core/MaybeSource;)Lkotlinx/coroutines/channels/ReceiveChannel;
+	public static final fun openSubscription (Lio/reactivex/rxjava3/core/ObservableSource;)Lkotlinx/coroutines/channels/ReceiveChannel;
+}
+
+public final class kotlinx/coroutines/rx3/RxCompletableKt {
+	public static final fun rxCompletable (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/rxjava3/core/Completable;
+	public static synthetic fun rxCompletable$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Completable;
+}
+
+public final class kotlinx/coroutines/rx3/RxConvertKt {
+	public static final fun asCompletable (Lkotlinx/coroutines/Job;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Completable;
+	public static final fun asFlow (Lio/reactivex/rxjava3/core/ObservableSource;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun asMaybe (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Maybe;
+	public static final fun asSingle (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Single;
+	public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Flowable;
+	public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Observable;
+}
+
+public final class kotlinx/coroutines/rx3/RxFlowableKt {
+	public static final fun rxFlowable (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/rxjava3/core/Flowable;
+	public static synthetic fun rxFlowable$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Flowable;
+}
+
+public final class kotlinx/coroutines/rx3/RxMaybeKt {
+	public static final fun rxMaybe (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/rxjava3/core/Maybe;
+	public static synthetic fun rxMaybe$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Maybe;
+}
+
+public final class kotlinx/coroutines/rx3/RxObservableKt {
+	public static final fun rxObservable (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/rxjava3/core/Observable;
+	public static synthetic fun rxObservable$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Observable;
+}
+
+public final class kotlinx/coroutines/rx3/RxSchedulerKt {
+	public static final fun asCoroutineDispatcher (Lio/reactivex/rxjava3/core/Scheduler;)Lkotlinx/coroutines/rx3/SchedulerCoroutineDispatcher;
+}
+
+public final class kotlinx/coroutines/rx3/RxSingleKt {
+	public static final fun rxSingle (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/rxjava3/core/Single;
+	public static synthetic fun rxSingle$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Single;
+}
+
+public final class kotlinx/coroutines/rx3/SchedulerCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher, kotlinx/coroutines/Delay {
+	public fun <init> (Lio/reactivex/rxjava3/core/Scheduler;)V
+	public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
+	public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V
+	public fun equals (Ljava/lang/Object;)Z
+	public final fun getScheduler ()Lio/reactivex/rxjava3/core/Scheduler;
+	public fun hashCode ()I
+	public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle;
+	public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V
+	public fun toString ()Ljava/lang/String;
+}
+
diff --git a/reactive/kotlinx-coroutines-rx3/build.gradle b/reactive/kotlinx-coroutines-rx3/build.gradle
new file mode 100644
index 0000000..ced694a
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/build.gradle
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+targetCompatibility = JavaVersion.VERSION_1_8
+
+dependencies {
+    compile project(':kotlinx-coroutines-reactive')
+    testCompile project(':kotlinx-coroutines-reactive').sourceSets.test.output
+    testCompile "org.reactivestreams:reactive-streams-tck:$reactive_streams_version"
+    compile "io.reactivex.rxjava3:rxjava:$rxjava3_version"
+}
+
+compileTestKotlin {
+    kotlinOptions.jvmTarget = "1.8"
+}
+
+compileKotlin {
+    kotlinOptions.jvmTarget = "1.8"
+}
+
+tasks.withType(dokka.getClass()) {
+    externalDocumentationLink {
+        url = new URL('http://reactivex.io/RxJava/3.x/javadoc/')
+        packageListUrl = projectDir.toPath().resolve("package.list").toUri().toURL()
+    }
+}
+
+task testNG(type: Test) {
+    useTestNG()
+    reports.html.destination = file("$buildDir/reports/testng")
+    include '**/*ReactiveStreamTckTest.*'
+    // Skip testNG when tests are filtered with --tests, otherwise it simply fails
+    onlyIf {
+        filter.includePatterns.isEmpty()
+    }
+    doFirst {
+        // Classic gradle, nothing works without doFirst
+        println "TestNG tests: ($includes)"
+    }
+}
+
+test {
+    dependsOn(testNG)
+    reports.html.destination = file("$buildDir/reports/junit")
+}
diff --git a/reactive/kotlinx-coroutines-rx3/package.list b/reactive/kotlinx-coroutines-rx3/package.list
new file mode 100644
index 0000000..889916d
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/package.list
@@ -0,0 +1,14 @@
+io.reactivex.rxjava3.core
+io.reactivex.rxjava3.annotations
+io.reactivex.rxjava3.disposables
+io.reactivex.rxjava3.exceptions
+io.reactivex.rxjava3.flowables
+io.reactivex.rxjava3.functions
+io.reactivex.rxjava3.observables
+io.reactivex.rxjava3.observers
+io.reactivex.rxjava3.parallel
+io.reactivex.rxjava3.plugins
+io.reactivex.rxjava3.processors
+io.reactivex.rxjava3.schedulers
+io.reactivex.rxjava3.subjects
+io.reactivex.rxjava3.subscribers
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt b/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt
new file mode 100644
index 0000000..e52556e
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt
@@ -0,0 +1,220 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import io.reactivex.rxjava3.disposables.Disposable
+import kotlinx.coroutines.CancellableContinuation
+import kotlinx.coroutines.CancellationException
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.suspendCancellableCoroutine
+import kotlin.coroutines.*
+
+// ------------------------ CompletableSource ------------------------
+
+/**
+ * Awaits for completion of this completable without blocking a thread.
+ * Returns `Unit` or throws the corresponding exception if this completable had produced error.
+ *
+ * This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this
+ * suspending function is suspended, this function immediately resumes with [CancellationException].
+ */
+public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
+    subscribe(object : CompletableObserver {
+        override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
+        override fun onComplete() { cont.resume(Unit) }
+        override fun onError(e: Throwable) { cont.resumeWithException(e) }
+    })
+}
+
+// ------------------------ MaybeSource ------------------------
+
+/**
+ * Awaits for completion of the maybe without blocking a thread.
+ * Returns the resulting value, null if no value was produced or throws the corresponding exception if this
+ * maybe had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ */
+@Suppress("UNCHECKED_CAST")
+public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).awaitOrDefault(null)
+
+/**
+ * Awaits for completion of the maybe without blocking a thread.
+ * Returns the resulting value, [default] if no value was produced or throws the corresponding exception if this
+ * maybe had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ */
+public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = suspendCancellableCoroutine { cont ->
+    subscribe(object : MaybeObserver<T> {
+        override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
+        override fun onComplete() { cont.resume(default) }
+        override fun onSuccess(t: T) { cont.resume(t) }
+        override fun onError(error: Throwable) { cont.resumeWithException(error) }
+    })
+}
+
+// ------------------------ SingleSource ------------------------
+
+/**
+ * Awaits for completion of the single value without blocking a thread.
+ * Returns the resulting value or throws the corresponding exception if this single had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ */
+public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
+    subscribe(object : SingleObserver<T> {
+        override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
+        override fun onSuccess(t: T) { cont.resume(t) }
+        override fun onError(error: Throwable) { cont.resumeWithException(error) }
+    })
+}
+
+// ------------------------ ObservableSource ------------------------
+
+/**
+ * Awaits for the first value from the given observable without blocking a thread.
+ * Returns the resulting value or throws the corresponding exception if this observable had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ *
+ * @throws NoSuchElementException if observable does not emit any value
+ */
+public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
+
+/**
+ * Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
+ * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ */
+public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
+
+/**
+ * Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
+ * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ */
+public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
+
+/**
+ * Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
+ * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ */
+public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
+
+/**
+ * Awaits for the last value from the given observable without blocking a thread.
+ * Returns the resulting value or throws the corresponding exception if this observable had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ *
+ * @throws NoSuchElementException if observable does not emit any value
+ */
+public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST)
+
+/**
+ * Awaits for the single value from the given observable without blocking a thread.
+ * Returns the resulting value or throws the corresponding exception if this observable had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ *
+ * @throws NoSuchElementException if observable does not emit any value
+ * @throws IllegalArgumentException if observable emits more than one value
+ */
+public suspend fun <T> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
+
+// ------------------------ private ------------------------
+
+internal fun CancellableContinuation<*>.disposeOnCancellation(d: Disposable) =
+    invokeOnCancellation { d.dispose() }
+
+private enum class Mode(val s: String) {
+    FIRST("awaitFirst"),
+    FIRST_OR_DEFAULT("awaitFirstOrDefault"),
+    LAST("awaitLast"),
+    SINGLE("awaitSingle");
+    override fun toString(): String = s
+}
+
+private suspend fun <T> ObservableSource<T>.awaitOne(
+    mode: Mode,
+    default: T? = null
+): T = suspendCancellableCoroutine { cont ->
+    subscribe(object : Observer<T> {
+        private lateinit var subscription: Disposable
+        private var value: T? = null
+        private var seenValue = false
+
+        override fun onSubscribe(sub: Disposable) {
+            subscription = sub
+            cont.invokeOnCancellation { sub.dispose() }
+        }
+
+        override fun onNext(t: T) {
+            when (mode) {
+                Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
+                    if (!seenValue) {
+                        seenValue = true
+                        cont.resume(t)
+                        subscription.dispose()
+                    }
+                }
+                Mode.LAST, Mode.SINGLE -> {
+                    if (mode == Mode.SINGLE && seenValue) {
+                        if (cont.isActive)
+                            cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
+                        subscription.dispose()
+                    } else {
+                        value = t
+                        seenValue = true
+                    }
+                }
+            }
+        }
+
+        @Suppress("UNCHECKED_CAST")
+        override fun onComplete() {
+            if (seenValue) {
+                if (cont.isActive) cont.resume(value as T)
+                return
+            }
+            when {
+                mode == Mode.FIRST_OR_DEFAULT -> {
+                    cont.resume(default as T)
+                }
+                cont.isActive -> {
+                    cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
+                }
+            }
+        }
+
+        override fun onError(e: Throwable) {
+            cont.resumeWithException(e)
+        }
+    })
+}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt b/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt
new file mode 100644
index 0000000..0b57b8b
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.functions.*
+import io.reactivex.rxjava3.plugins.*
+import kotlinx.coroutines.*
+import kotlin.coroutines.*
+
+internal class RxCancellable(private val job: Job) : Cancellable {
+    override fun cancel() {
+        job.cancel()
+    }
+}
+
+internal fun handleUndeliverableException(cause: Throwable, context: CoroutineContext) {
+    if (cause is CancellationException) return // Async CE should be completely ignored
+    try {
+        RxJavaPlugins.onError(cause)
+    } catch (e: Throwable) {
+        handleCoroutineException(context, cause)
+    }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt
new file mode 100644
index 0000000..acb907b
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import io.reactivex.rxjava3.disposables.*
+import kotlinx.atomicfu.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.internal.*
+
+/**
+ * Subscribes to this [MaybeSource] and returns a channel to receive elements emitted by it.
+ * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this source.
+ *
+ * This API is internal in the favour of [Flow].
+ * [MaybeSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first.
+ */
+@PublishedApi
+internal fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
+    val channel = SubscriptionChannel<T>()
+    subscribe(channel)
+    return channel
+}
+
+/**
+ * Subscribes to this [ObservableSource] and returns a channel to receive elements emitted by it.
+ * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this source.
+ *
+ * This API is internal in the favour of [Flow].
+ * [ObservableSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first.
+ */
+@PublishedApi
+internal fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
+    val channel = SubscriptionChannel<T>()
+    subscribe(channel)
+    return channel
+}
+
+/**
+ * Subscribes to this [MaybeSource] and performs the specified action for each received element.
+ * Cancels subscription if any exception happens during collect.
+ */
+public suspend inline fun <T> MaybeSource<T>.collect(action: (T) -> Unit): Unit =
+    openSubscription().consumeEach(action)
+
+/**
+ * Subscribes to this [ObservableSource] and performs the specified action for each received element.
+ * Cancels subscription if any exception happens during collect.
+ */
+public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit): Unit =
+    openSubscription().consumeEach(action)
+
+@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
+private class SubscriptionChannel<T> :
+    LinkedListChannel<T>(), Observer<T>, MaybeObserver<T>
+{
+    private val _subscription = atomic<Disposable?>(null)
+
+    @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
+    override fun onClosedIdempotent(closed: LockFreeLinkedListNode) {
+        _subscription.getAndSet(null)?.dispose() // dispose exactly once
+    }
+
+    // Observer overrider
+    override fun onSubscribe(sub: Disposable) {
+        _subscription.value = sub
+    }
+
+    override fun onSuccess(t: T) {
+        offer(t)
+    }
+
+    override fun onNext(t: T) {
+        offer(t)
+    }
+
+    override fun onComplete() {
+        close(cause = null)
+    }
+
+    override fun onError(e: Throwable) {
+        close(cause = e)
+    }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt b/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt
new file mode 100644
index 0000000..54b412f
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import kotlinx.coroutines.*
+import kotlin.coroutines.*
+
+/**
+ * Creates cold [Completable] that runs a given [block] in a coroutine and emits its result.
+ * Every time the returned completable is subscribed, it starts a new coroutine.
+ * Unsubscribing cancels running coroutine.
+ * Coroutine context can be specified with [context] argument.
+ * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
+ * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
+ */
+public fun rxCompletable(
+    context: CoroutineContext = EmptyCoroutineContext,
+    block: suspend CoroutineScope.() -> Unit
+): Completable {
+    require(context[Job] === null) { "Completable context cannot contain job in it." +
+            "Its lifecycle should be managed via Disposable handle. Had $context" }
+    return rxCompletableInternal(GlobalScope, context, block)
+}
+
+private fun rxCompletableInternal(
+    scope: CoroutineScope, // support for legacy rxCompletable in scope
+    context: CoroutineContext,
+    block: suspend CoroutineScope.() -> Unit
+): Completable = Completable.create { subscriber ->
+    val newContext = scope.newCoroutineContext(context)
+    val coroutine = RxCompletableCoroutine(newContext, subscriber)
+    subscriber.setCancellable(RxCancellable(coroutine))
+    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
+}
+
+private class RxCompletableCoroutine(
+    parentContext: CoroutineContext,
+    private val subscriber: CompletableEmitter
+) : AbstractCoroutine<Unit>(parentContext, true) {
+    override fun onCompleted(value: Unit) {
+        try {
+            subscriber.onComplete()
+        } catch (e: Throwable) {
+            handleUndeliverableException(e, context)
+        }
+    }
+
+    override fun onCancelled(cause: Throwable, handled: Boolean) {
+        try {
+            if (!subscriber.tryOnError(cause)) {
+                handleUndeliverableException(cause, context)
+            }
+        } catch (e: Throwable) {
+            handleUndeliverableException(e, context)
+        }
+    }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
new file mode 100644
index 0000000..f9e2e21
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import io.reactivex.rxjava3.disposables.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.reactive.*
+import java.util.concurrent.atomic.*
+import kotlin.coroutines.*
+
+/**
+ * Converts this job to the hot reactive completable that signals
+ * with [onCompleted][CompletableSubscriber.onCompleted] when the corresponding job completes.
+ *
+ * Every subscriber gets the signal at the same time.
+ * Unsubscribing from the resulting completable **does not** affect the original job in any way.
+ *
+ * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
+ *    in the future to account for the concept of structured concurrency.
+ *
+ * @param context -- the coroutine context from which the resulting completable is going to be signalled
+ */
+@ExperimentalCoroutinesApi
+public fun Job.asCompletable(context: CoroutineContext): Completable = rxCompletable(context) {
+    this@asCompletable.join()
+}
+
+/**
+ * Converts this deferred value to the hot reactive maybe that signals
+ * [onComplete][MaybeEmitter.onComplete], [onSuccess][MaybeEmitter.onSuccess] or [onError][MaybeEmitter.onError].
+ *
+ * Every subscriber gets the same completion value.
+ * Unsubscribing from the resulting maybe **does not** affect the original deferred value in any way.
+ *
+ * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
+ *    in the future to account for the concept of structured concurrency.
+ *
+ * @param context -- the coroutine context from which the resulting maybe is going to be signalled
+ */
+@ExperimentalCoroutinesApi
+public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMaybe(context) {
+    this@asMaybe.await()
+}
+
+/**
+ * Converts this deferred value to the hot reactive single that signals either
+ * [onSuccess][SingleSubscriber.onSuccess] or [onError][SingleSubscriber.onError].
+ *
+ * Every subscriber gets the same completion value.
+ * Unsubscribing from the resulting single **does not** affect the original deferred value in any way.
+ *
+ * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
+ *    in the future to account for the concept of structured concurrency.
+ *
+ * @param context -- the coroutine context from which the resulting single is going to be signalled
+ */
+@ExperimentalCoroutinesApi
+public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = rxSingle(context) {
+    this@asSingle.await()
+}
+
+/**
+ * Transforms given cold [ObservableSource] into cold [Flow].
+ *
+ * The resulting flow is _cold_, which means that [ObservableSource.subscribe] is called every time a terminal operator
+ * is applied to the resulting flow.
+ *
+ * A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the
+ * resulting flow to specify a user-defined value and to control what happens when data is produced faster
+ * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details.
+ */
+@ExperimentalCoroutinesApi
+public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
+    val disposableRef = AtomicReference<Disposable>()
+    val observer = object : Observer<T> {
+        override fun onComplete() { close() }
+        override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
+        override fun onNext(t: T) { sendBlocking(t) }
+        override fun onError(e: Throwable) { close(e) }
+    }
+
+    subscribe(observer)
+    awaitClose { disposableRef.getAndSet(Disposable.disposed())?.dispose() }
+}
+
+/**
+ * Converts the given flow to a cold observable.
+ * The original flow is cancelled when the observable subscriber is disposed.
+ */
+@JvmName("from")
+@ExperimentalCoroutinesApi
+public fun <T: Any> Flow<T>.asObservable() : Observable<T> = Observable.create { emitter ->
+    /*
+     * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
+     * asObservable is already invoked from unconfined
+     */
+    val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.ATOMIC) {
+        try {
+            collect { value -> emitter.onNext(value) }
+            emitter.onComplete()
+        } catch (e: Throwable) {
+            // 'create' provides safe emitter, so we can unconditionally call on* here if exception occurs in `onComplete`
+            if (e !is CancellationException) {
+                if (!emitter.tryOnError(e)) {
+                    handleUndeliverableException(e, coroutineContext)
+                }
+            } else {
+                emitter.onComplete()
+            }
+        }
+    }
+    emitter.setCancellable(RxCancellable(job))
+}
+
+/**
+ * Converts the given flow to a cold flowable.
+ * The original flow is cancelled when the flowable subscriber is disposed.
+ */
+@JvmName("from")
+@ExperimentalCoroutinesApi
+public fun <T: Any> Flow<T>.asFlowable(): Flowable<T> = Flowable.fromPublisher(asPublisher())
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxFlowable.kt b/reactive/kotlinx-coroutines-rx3/src/RxFlowable.kt
new file mode 100644
index 0000000..2de46a6
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/src/RxFlowable.kt
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.reactive.*
+import kotlin.coroutines.*
+
+/**
+ * Creates cold [flowable][Flowable] that will run a given [block] in a coroutine.
+ * Every time the returned flowable is subscribed, it starts a new coroutine.
+ *
+ * Coroutine emits ([ObservableEmitter.onNext]) values with `send`, completes ([ObservableEmitter.onComplete])
+ * when the coroutine completes or channel is explicitly closed and emits error ([ObservableEmitter.onError])
+ * if coroutine throws an exception or closes channel with a cause.
+ * Unsubscribing cancels running coroutine.
+ *
+ * Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
+ * `onNext` is not invoked concurrently.
+ *
+ * Coroutine context can be specified with [context] argument.
+ * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
+ * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
+ *
+ * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
+ */
+@ExperimentalCoroutinesApi
+public fun <T: Any> rxFlowable(
+    context: CoroutineContext = EmptyCoroutineContext,
+    @BuilderInference block: suspend ProducerScope<T>.() -> Unit
+): Flowable<T> {
+    require(context[Job] === null) { "Flowable context cannot contain job in it." +
+            "Its lifecycle should be managed via Disposable handle. Had $context" }
+    return Flowable.fromPublisher(publishInternal(GlobalScope, context, RX_HANDLER, block))
+}
+
+private val RX_HANDLER: (Throwable, CoroutineContext) -> Unit = ::handleUndeliverableException
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt
new file mode 100644
index 0000000..4d55ef5
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import kotlinx.coroutines.*
+import kotlin.coroutines.*
+
+/**
+ * Creates cold [maybe][Maybe] that will run a given [block] in a coroutine and emits its result.
+ * If [block] result is `null`, [onComplete][MaybeObserver.onComplete] is invoked without a value.
+ * Every time the returned observable is subscribed, it starts a new coroutine.
+ * Unsubscribing cancels running coroutine.
+ * Coroutine context can be specified with [context] argument.
+ * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
+ * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
+ */
+public fun <T> rxMaybe(
+    context: CoroutineContext = EmptyCoroutineContext,
+    block: suspend CoroutineScope.() -> T?
+): Maybe<T> {
+    require(context[Job] === null) { "Maybe context cannot contain job in it." +
+            "Its lifecycle should be managed via Disposable handle. Had $context" }
+    return rxMaybeInternal(GlobalScope, context, block)
+}
+
+private fun <T> rxMaybeInternal(
+    scope: CoroutineScope, // support for legacy rxMaybe in scope
+    context: CoroutineContext,
+    block: suspend CoroutineScope.() -> T?
+): Maybe<T> = Maybe.create { subscriber ->
+    val newContext = scope.newCoroutineContext(context)
+    val coroutine = RxMaybeCoroutine(newContext, subscriber)
+    subscriber.setCancellable(RxCancellable(coroutine))
+    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
+}
+
+private class RxMaybeCoroutine<T>(
+    parentContext: CoroutineContext,
+    private val subscriber: MaybeEmitter<T>
+) : AbstractCoroutine<T>(parentContext, true) {
+    override fun onCompleted(value: T) {
+        try {
+            if (value == null) subscriber.onComplete() else subscriber.onSuccess(value)
+        } catch (e: Throwable) {
+            handleUndeliverableException(e, context)
+        }
+    }
+
+    override fun onCancelled(cause: Throwable, handled: Boolean) {
+        try {
+            if (!subscriber.tryOnError(cause)) {
+                handleUndeliverableException(cause, context)
+            }
+        } catch (e: Throwable) {
+            handleUndeliverableException(e, context)
+        }
+    }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt
new file mode 100644
index 0000000..102d06e
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt
@@ -0,0 +1,195 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import io.reactivex.rxjava3.exceptions.*
+import kotlinx.atomicfu.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.selects.*
+import kotlinx.coroutines.sync.*
+import kotlin.coroutines.*
+
+/**
+ * Creates cold [observable][Observable] that will run a given [block] in a coroutine.
+ * Every time the returned observable is subscribed, it starts a new coroutine.
+ *
+ * Coroutine emits ([ObservableEmitter.onNext]) values with `send`, completes ([ObservableEmitter.onComplete])
+ * when the coroutine completes or channel is explicitly closed and emits error ([ObservableEmitter.onError])
+ * if coroutine throws an exception or closes channel with a cause.
+ * Unsubscribing cancels running coroutine.
+ *
+ * Invocations of `send` are suspended appropriately to ensure that `onNext` is not invoked concurrently.
+ * Note that Rx 2.x [Observable] **does not support backpressure**.
+ *
+ * Coroutine context can be specified with [context] argument.
+ * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
+ * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
+ */
+@ExperimentalCoroutinesApi
+public fun <T : Any> rxObservable(
+    context: CoroutineContext = EmptyCoroutineContext,
+    @BuilderInference block: suspend ProducerScope<T>.() -> Unit
+): Observable<T> {
+    require(context[Job] === null) { "Observable context cannot contain job in it." +
+            "Its lifecycle should be managed via Disposable handle. Had $context" }
+    return rxObservableInternal(GlobalScope, context, block)
+}
+
+private fun <T : Any> rxObservableInternal(
+    scope: CoroutineScope, // support for legacy rxObservable in scope
+    context: CoroutineContext,
+    block: suspend ProducerScope<T>.() -> Unit
+): Observable<T> = Observable.create { subscriber ->
+    val newContext = scope.newCoroutineContext(context)
+    val coroutine = RxObservableCoroutine(newContext, subscriber)
+    subscriber.setCancellable(RxCancellable(coroutine)) // do it first (before starting coroutine), to await unnecessary suspensions
+    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
+}
+
+private const val OPEN = 0        // open channel, still working
+private const val CLOSED = -1     // closed, but have not signalled onCompleted/onError yet
+private const val SIGNALLED = -2  // already signalled subscriber onCompleted/onError
+
+private class RxObservableCoroutine<T: Any>(
+    parentContext: CoroutineContext,
+    private val subscriber: ObservableEmitter<T>
+) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, SelectClause2<T, SendChannel<T>> {
+    override val channel: SendChannel<T> get() = this
+
+    // Mutex is locked when while subscriber.onXXX is being invoked
+    private val mutex = Mutex()
+
+    private val _signal = atomic(OPEN)
+
+    override val isClosedForSend: Boolean get() = isCompleted
+    override val isFull: Boolean = mutex.isLocked
+    override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause)
+    override fun invokeOnClose(handler: (Throwable?) -> Unit) =
+        throw UnsupportedOperationException("RxObservableCoroutine doesn't support invokeOnClose")
+
+    override fun offer(element: T): Boolean {
+        if (!mutex.tryLock()) return false
+        doLockedNext(element)
+        return true
+    }
+
+    public override suspend fun send(element: T) {
+        // fast-path -- try send without suspension
+        if (offer(element)) return
+        // slow-path does suspend
+        return sendSuspend(element)
+    }
+
+    private suspend fun sendSuspend(element: T) {
+        mutex.lock()
+        doLockedNext(element)
+    }
+
+    override val onSend: SelectClause2<T, SendChannel<T>>
+        get() = this
+
+    // registerSelectSend
+    @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
+    override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) {
+        mutex.onLock.registerSelectClause2(select, null) {
+            doLockedNext(element)
+            block(this)
+        }
+    }
+
+    // assert: mutex.isLocked()
+    private fun doLockedNext(elem: T) {
+        // check if already closed for send
+        if (!isActive) {
+            doLockedSignalCompleted(completionCause, completionCauseHandled)
+            throw getCancellationException()
+        }
+        // notify subscriber
+        try {
+            subscriber.onNext(elem)
+        } catch (e: Throwable) {
+            // If onNext fails with exception, then we cancel coroutine (with this exception) and then rethrow it
+            // to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery,
+            // this failure is essentially equivalent to a failure of a child coroutine.
+            cancelCoroutine(e)
+            mutex.unlock()
+            throw e
+        }
+        /*
+         * There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might
+         * happen after this check and before `unlock` (see signalCompleted that does not do anything
+         * if it fails to acquire the lock that we are still holding).
+         * We have to recheck `isCompleted` after `unlock` anyway.
+         */
+        unlockAndCheckCompleted()
+    }
+
+    private fun unlockAndCheckCompleted() {
+        mutex.unlock()
+        // recheck isActive
+        if (!isActive && mutex.tryLock())
+            doLockedSignalCompleted(completionCause, completionCauseHandled)
+    }
+
+    // assert: mutex.isLocked()
+    private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) {
+        // cancellation failures
+        try {
+            if (_signal.value >= CLOSED) {
+                _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
+                try {
+                    if (cause != null && cause !is CancellationException) {
+                        /*
+                         * Reactive frameworks have two types of exceptions: regular and fatal.
+                         * Regular are passed to onError.
+                         * Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297).
+                         * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether
+                         * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was
+                         * thrown by subscriber or upstream).
+                         * To make behaviour consistent and least surprising, we always handle fatal exceptions
+                         * by coroutines machinery, anyway, they should not be present in regular program flow,
+                         * thus our goal here is just to expose it as soon as possible.
+                         */
+                        subscriber.tryOnError(cause)
+                        if (!handled && cause.isFatal()) {
+                            handleUndeliverableException(cause, context)
+                        }
+                    }
+                    else {
+                        subscriber.onComplete()
+                    }
+                } catch (e: Throwable) {
+                    // Unhandled exception (cannot handle in other way, since we are already complete)
+                    handleUndeliverableException(e, context)
+                }
+            }
+        } finally {
+            mutex.unlock()
+        }
+    }
+
+    private fun signalCompleted(cause: Throwable?, handled: Boolean) {
+        if (!_signal.compareAndSet(OPEN, CLOSED)) return // abort, other thread invoked doLockedSignalCompleted
+        if (mutex.tryLock()) // if we can acquire the lock
+            doLockedSignalCompleted(cause, handled)
+    }
+
+    override fun onCompleted(value: Unit) {
+        signalCompleted(null, false)
+    }
+
+    override fun onCancelled(cause: Throwable, handled: Boolean) {
+        signalCompleted(cause, handled)
+    }
+}
+
+internal fun Throwable.isFatal() = try {
+    Exceptions.throwIfFatal(this) // Rx-consistent behaviour without hardcode
+    false
+} catch (e: Throwable) {
+    true
+}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt b/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt
new file mode 100644
index 0000000..6e91aee
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.Scheduler
+import kotlinx.coroutines.*
+import java.util.concurrent.TimeUnit
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher]
+ * and provides native support of [delay] and [withTimeout].
+ */
+public fun Scheduler.asCoroutineDispatcher(): SchedulerCoroutineDispatcher = SchedulerCoroutineDispatcher(this)
+
+/**
+ * Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler].
+ */
+public class SchedulerCoroutineDispatcher(
+    /**
+     * Underlying scheduler of current [CoroutineDispatcher].
+     */
+    public val scheduler: Scheduler
+) : CoroutineDispatcher(), Delay {
+    /** @suppress */
+    override fun dispatch(context: CoroutineContext, block: Runnable) {
+        scheduler.scheduleDirect(block)
+    }
+
+    /** @suppress */
+    override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
+        val disposable = scheduler.scheduleDirect({
+            with(continuation) { resumeUndispatched(Unit) }
+        }, timeMillis, TimeUnit.MILLISECONDS)
+        continuation.disposeOnCancellation(disposable)
+    }
+
+    /** @suppress */
+    override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
+        val disposable = scheduler.scheduleDirect(block, timeMillis, TimeUnit.MILLISECONDS)
+        return DisposableHandle { disposable.dispose() }
+    }
+
+    /** @suppress */
+    override fun toString(): String = scheduler.toString()
+    /** @suppress */
+    override fun equals(other: Any?): Boolean = other is SchedulerCoroutineDispatcher && other.scheduler === scheduler
+    /** @suppress */
+    override fun hashCode(): Int = System.identityHashCode(scheduler)
+}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt b/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt
new file mode 100644
index 0000000..225df93
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import kotlinx.coroutines.*
+import kotlin.coroutines.*
+
+/**
+ * Creates cold [single][Single] that will run a given [block] in a coroutine and emits its result.
+ * Every time the returned observable is subscribed, it starts a new coroutine.
+ * Unsubscribing cancels running coroutine.
+ * Coroutine context can be specified with [context] argument.
+ * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
+ * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
+ */
+public fun <T : Any> rxSingle(
+    context: CoroutineContext = EmptyCoroutineContext,
+    block: suspend CoroutineScope.() -> T
+): Single<T> {
+    require(context[Job] === null) { "Single context cannot contain job in it." +
+            "Its lifecycle should be managed via Disposable handle. Had $context" }
+    return rxSingleInternal(GlobalScope, context, block)
+}
+
+private fun <T : Any> rxSingleInternal(
+    scope: CoroutineScope, // support for legacy rxSingle in scope
+    context: CoroutineContext,
+    block: suspend CoroutineScope.() -> T
+): Single<T> = Single.create { subscriber ->
+    val newContext = scope.newCoroutineContext(context)
+    val coroutine = RxSingleCoroutine(newContext, subscriber)
+    subscriber.setCancellable(RxCancellable(coroutine))
+    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
+}
+
+private class RxSingleCoroutine<T: Any>(
+    parentContext: CoroutineContext,
+    private val subscriber: SingleEmitter<T>
+) : AbstractCoroutine<T>(parentContext, true) {
+    override fun onCompleted(value: T) {
+        try {
+            subscriber.onSuccess(value)
+        } catch (e: Throwable) {
+            handleUndeliverableException(e, context)
+        }
+    }
+
+    override fun onCancelled(cause: Throwable, handled: Boolean) {
+        try {
+            if (!subscriber.tryOnError(cause)) {
+                handleUndeliverableException(cause, context)
+            }
+        } catch (e: Throwable) {
+            handleUndeliverableException(e, context)
+        }
+    }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/BackpressureTest.kt b/reactive/kotlinx-coroutines-rx3/test/BackpressureTest.kt
new file mode 100644
index 0000000..9cec483
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/BackpressureTest.kt
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.reactive.*
+import org.junit.Test
+import kotlin.test.*
+
+class BackpressureTest : TestBase() {
+    @Test
+    fun testBackpressureDropDirect() = runTest {
+        expect(1)
+        Flowable.fromArray(1)
+            .onBackpressureDrop()
+            .collect {
+                assertEquals(1, it)
+                expect(2)
+            }
+        finish(3)
+    }
+
+    @Test
+    fun testBackpressureDropFlow() = runTest {
+        expect(1)
+        Flowable.fromArray(1)
+            .onBackpressureDrop()
+            .asFlow()
+            .collect {
+                assertEquals(1, it)
+                expect(2)
+            }
+        finish(3)
+    }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/Check.kt b/reactive/kotlinx-coroutines-rx3/test/Check.kt
new file mode 100644
index 0000000..3d4704f
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/Check.kt
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import io.reactivex.rxjava3.plugins.*
+
+fun <T> checkSingleValue(
+    observable: Observable<T>,
+    checker: (T) -> Unit
+) {
+    val singleValue = observable.blockingSingle()
+    checker(singleValue)
+}
+
+fun checkErroneous(
+        observable: Observable<*>,
+        checker: (Throwable) -> Unit
+) {
+    val singleNotification = observable.materialize().blockingSingle()
+    val error = singleNotification.error ?: error("Excepted error")
+    checker(error)
+}
+
+fun <T> checkSingleValue(
+    single: Single<T>,
+    checker: (T) -> Unit
+) {
+    val singleValue = single.blockingGet()
+    checker(singleValue)
+}
+
+fun checkErroneous(
+    single: Single<*>,
+    checker: (Throwable) -> Unit
+) {
+    try {
+        single.blockingGet()
+        error("Should have failed")
+    } catch (e: Throwable) {
+        checker(e)
+    }
+}
+
+fun <T> checkMaybeValue(
+        maybe: Maybe<T>,
+        checker: (T?) -> Unit
+) {
+    val maybeValue = maybe.toFlowable().blockingIterable().firstOrNull()
+    checker(maybeValue)
+}
+
+@Suppress("UNCHECKED_CAST")
+fun checkErroneous(
+    maybe: Maybe<*>,
+    checker: (Throwable) -> Unit
+) {
+    try {
+        (maybe as Maybe<Any>).blockingGet()
+        error("Should have failed")
+    } catch (e: Throwable) {
+        checker(e)
+    }
+}
+
+inline fun withExceptionHandler(noinline handler: (Throwable) -> Unit, block: () -> Unit) {
+    val original = RxJavaPlugins.getErrorHandler()
+    RxJavaPlugins.setErrorHandler { handler(it) }
+    try {
+        block()
+    } finally {
+        RxJavaPlugins.setErrorHandler(original)
+    }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/CompletableTest.kt b/reactive/kotlinx-coroutines-rx3/test/CompletableTest.kt
new file mode 100644
index 0000000..e5399d1
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/CompletableTest.kt
@@ -0,0 +1,181 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import io.reactivex.rxjava3.disposables.*
+import io.reactivex.rxjava3.exceptions.*
+import kotlinx.coroutines.*
+import org.junit.Test
+import kotlin.test.*
+
+class CompletableTest : TestBase() {
+    @Test
+    fun testBasicSuccess() = runBlocking {
+        expect(1)
+        val completable = rxCompletable(currentDispatcher()) {
+            expect(4)
+        }
+        expect(2)
+        completable.subscribe {
+            expect(5)
+        }
+        expect(3)
+        yield() // to completable coroutine
+        finish(6)
+    }
+
+    @Test
+    fun testBasicFailure() = runBlocking {
+        expect(1)
+        val completable = rxCompletable(currentDispatcher()) {
+            expect(4)
+            throw RuntimeException("OK")
+        }
+        expect(2)
+        completable.subscribe({
+            expectUnreached()
+        }, { error ->
+            expect(5)
+            assertTrue(error is RuntimeException)
+            assertEquals("OK", error.message)
+        })
+        expect(3)
+        yield() // to completable coroutine
+        finish(6)
+    }
+
+    @Test
+    fun testBasicUnsubscribe() = runBlocking {
+        expect(1)
+        val completable = rxCompletable(currentDispatcher()) {
+            expect(4)
+            yield() // back to main, will get cancelled
+            expectUnreached()
+        }
+        expect(2)
+        // nothing is called on a disposed rx3 completable
+        val sub = completable.subscribe({
+            expectUnreached()
+        }, {
+            expectUnreached()
+        })
+        expect(3)
+        yield() // to started coroutine
+        expect(5)
+        sub.dispose() // will cancel coroutine
+        yield()
+        finish(6)
+    }
+
+    @Test
+    fun testAwaitSuccess() = runBlocking {
+        expect(1)
+        val completable = rxCompletable(currentDispatcher()) {
+            expect(3)
+        }
+        expect(2)
+        completable.await() // shall launch coroutine
+        finish(4)
+    }
+
+    @Test
+    fun testAwaitFailure() = runBlocking {
+        expect(1)
+        val completable = rxCompletable(currentDispatcher()) {
+            expect(3)
+            throw RuntimeException("OK")
+        }
+        expect(2)
+        try {
+            completable.await() // shall launch coroutine and throw exception
+            expectUnreached()
+        } catch (e: RuntimeException) {
+            finish(4)
+            assertEquals("OK", e.message)
+        }
+    }
+
+    @Test
+    fun testSuppressedException() = runTest {
+        val completable = rxCompletable(currentDispatcher()) {
+            launch(start = CoroutineStart.ATOMIC) {
+                throw TestException() // child coroutine fails
+            }
+            try {
+                delay(Long.MAX_VALUE)
+            } finally {
+                throw TestException2() // but parent throws another exception while cleaning up
+            }
+        }
+        try {
+            completable.await()
+            expectUnreached()
+        } catch (e: TestException) {
+            assertTrue(e.suppressed[0] is TestException2)
+        }
+    }
+
+    @Test
+    fun testUnhandledException() = runTest() {
+        expect(1)
+        var disposable: Disposable? = null
+        val handler = { e: Throwable ->
+            assertTrue(e is UndeliverableException && e.cause is TestException)
+            expect(5)
+        }
+        val completable = rxCompletable(currentDispatcher()) {
+            expect(4)
+            disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled
+            try {
+                delay(Long.MAX_VALUE)
+            } finally {
+                throw TestException() // would not be able to handle it since mono is disposed
+            }
+        }
+        withExceptionHandler(handler) {
+            completable.subscribe(object : CompletableObserver {
+                override fun onSubscribe(d: Disposable) {
+                    expect(2)
+                    disposable = d
+                }
+
+                override fun onComplete() {
+                    expectUnreached()
+                }
+
+                override fun onError(t: Throwable) {
+                    expectUnreached()
+                }
+            })
+            expect(3)
+            yield() // run coroutine
+            finish(6)
+        }
+    }
+
+    @Test
+    fun testFatalExceptionInSubscribe() = runTest {
+        val handler: (Throwable) -> Unit = { e ->
+            assertTrue(e is UndeliverableException && e.cause is LinkageError); expect(2)
+        }
+
+        withExceptionHandler(handler) {
+            rxCompletable(Dispatchers.Unconfined) {
+                expect(1)
+                42
+            }.subscribe({ throw LinkageError() })
+            finish(3)
+        }
+    }
+
+    @Test
+    fun testFatalExceptionInSingle() = runTest {
+        rxCompletable(Dispatchers.Unconfined) {
+            throw LinkageError()
+        }.subscribe({ expectUnreached()  }, { expect(1); assertTrue(it is LinkageError) })
+        finish(2)
+    }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/ConvertTest.kt b/reactive/kotlinx-coroutines-rx3/test/ConvertTest.kt
new file mode 100644
index 0000000..e2fe533
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/ConvertTest.kt
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.consumeAsFlow
+import org.junit.Assert
+import org.junit.Test
+import kotlin.test.*
+
+class ConvertTest : TestBase() {
+    @Test
+    fun testToCompletableSuccess() = runBlocking {
+        expect(1)
+        val job = launch {
+            expect(3)
+        }
+        val completable = job.asCompletable(coroutineContext.minusKey(Job))
+        completable.subscribe {
+            expect(4)
+        }
+        expect(2)
+        yield()
+        finish(5)
+    }
+
+    @Test
+    fun testToCompletableFail() = runBlocking {
+        expect(1)
+        val job = async(NonCancellable) { // don't kill parent on exception
+            expect(3)
+            throw RuntimeException("OK")
+        }
+        val completable = job.asCompletable(coroutineContext.minusKey(Job))
+        completable.subscribe {
+            expect(4)
+        }
+        expect(2)
+        yield()
+        finish(5)
+    }
+
+    @Test
+    fun testToMaybe() {
+        val d = GlobalScope.async {
+            delay(50)
+            "OK"
+        }
+        val maybe1 = d.asMaybe(Dispatchers.Unconfined)
+        checkMaybeValue(maybe1) {
+            assertEquals("OK", it)
+        }
+        val maybe2 = d.asMaybe(Dispatchers.Unconfined)
+        checkMaybeValue(maybe2) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testToMaybeEmpty() {
+        val d = GlobalScope.async {
+            delay(50)
+            null
+        }
+        val maybe1 = d.asMaybe(Dispatchers.Unconfined)
+        checkMaybeValue(maybe1, Assert::assertNull)
+        val maybe2 = d.asMaybe(Dispatchers.Unconfined)
+        checkMaybeValue(maybe2, Assert::assertNull)
+    }
+
+    @Test
+    fun testToMaybeFail() {
+        val d = GlobalScope.async {
+            delay(50)
+            throw TestRuntimeException("OK")
+        }
+        val maybe1 = d.asMaybe(Dispatchers.Unconfined)
+        checkErroneous(maybe1) {
+            check(it is TestRuntimeException && it.message == "OK") { "$it" }
+        }
+        val maybe2 = d.asMaybe(Dispatchers.Unconfined)
+        checkErroneous(maybe2) {
+            check(it is TestRuntimeException && it.message == "OK") { "$it" }
+        }
+    }
+
+    @Test
+    fun testToSingle() {
+        val d = GlobalScope.async {
+            delay(50)
+            "OK"
+        }
+        val single1 = d.asSingle(Dispatchers.Unconfined)
+        checkSingleValue(single1) {
+            assertEquals("OK", it)
+        }
+        val single2 = d.asSingle(Dispatchers.Unconfined)
+        checkSingleValue(single2) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testToSingleFail() {
+        val d = GlobalScope.async {
+            delay(50)
+            throw TestRuntimeException("OK")
+        }
+        val single1 = d.asSingle(Dispatchers.Unconfined)
+        checkErroneous(single1) {
+            check(it is TestRuntimeException && it.message == "OK") { "$it" }
+        }
+        val single2 = d.asSingle(Dispatchers.Unconfined)
+        checkErroneous(single2) {
+            check(it is TestRuntimeException && it.message == "OK") { "$it" }
+        }
+    }
+
+    @Test
+    fun testToObservable() {
+        val c = GlobalScope.produce {
+            delay(50)
+            send("O")
+            delay(50)
+            send("K")
+        }
+        val observable = c.consumeAsFlow().asObservable()
+        checkSingleValue(observable.reduce { t1, t2 -> t1 + t2 }.toSingle()) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testToObservableFail() {
+        val c = GlobalScope.produce {
+            delay(50)
+            send("O")
+            delay(50)
+            throw TestException("K")
+        }
+        val observable = c.consumeAsFlow().asObservable()
+        val single = rxSingle(Dispatchers.Unconfined) {
+            var result = ""
+            try {
+                observable.collect { result += it }
+            } catch(e: Throwable) {
+                check(e is TestException)
+                result += e.message
+            }
+            result
+        }
+        checkSingleValue(single) {
+            assertEquals("OK", it)
+        }
+    }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/FlowAsObservableTest.kt b/reactive/kotlinx-coroutines-rx3/test/FlowAsObservableTest.kt
new file mode 100644
index 0000000..50c4ae7
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/FlowAsObservableTest.kt
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import org.junit.Test
+import kotlin.test.*
+
+class FlowAsObservableTest : TestBase() {
+    @Test
+    fun testBasicSuccess() = runTest {
+        expect(1)
+        val observable = flow {
+            expect(3)
+            emit("OK")
+        }.asObservable()
+
+        expect(2)
+        observable.subscribe { value ->
+            expect(4)
+            assertEquals("OK", value)
+        }
+
+        finish(5)
+    }
+
+    @Test
+    fun testBasicFailure() = runTest {
+        expect(1)
+        val observable = flow<Int> {
+            expect(3)
+            throw RuntimeException("OK")
+        }.asObservable()
+
+        expect(2)
+        observable.subscribe({ expectUnreached() }, { error ->
+            expect(4)
+            assertTrue(error is RuntimeException)
+            assertEquals("OK", error.message)
+        })
+        finish(5)
+    }
+
+    @Test
+    fun testBasicUnsubscribe() = runTest {
+        expect(1)
+        val observable = flow<Int> {
+            expect(3)
+            hang {
+                expect(4)
+            }
+        }.asObservable()
+
+        expect(2)
+        val sub = observable.subscribe({ expectUnreached() }, { expectUnreached() })
+        sub.dispose() // will cancel coroutine
+        finish(5)
+    }
+
+    @Test
+    fun testNotifyOnceOnCancellation() = runTest {
+        val observable =
+            flow {
+                expect(3)
+                emit("OK")
+                hang {
+                    expect(7)
+                }
+            }.asObservable()
+                .doOnNext {
+                    expect(4)
+                    assertEquals("OK", it)
+                }
+                .doOnDispose {
+                    expect(6) // notified once!
+                }
+
+        expect(1)
+        val job = launch(start = CoroutineStart.UNDISPATCHED) {
+            expect(2)
+            observable.collect {
+                expect(5)
+                assertEquals("OK", it)
+            }
+        }
+
+        yield()
+        job.cancelAndJoin()
+        finish(8)
+    }
+
+    @Test
+    fun testFailingConsumer() = runTest {
+        expect(1)
+        val observable = flow {
+            expect(2)
+            emit("OK")
+            hang {
+                expect(4)
+            }
+
+        }.asObservable()
+
+        try {
+            observable.collect {
+                expect(3)
+                throw TestException()
+            }
+        } catch (e: TestException) {
+            finish(5)
+        }
+    }
+
+    @Test
+    fun testNonAtomicStart() = runTest {
+        withContext(Dispatchers.Unconfined) {
+            val observable = flow<Int> {
+                expect(1)
+            }.asObservable()
+
+            val disposable = observable.subscribe({ expectUnreached() }, { expectUnreached() }, { expectUnreached() })
+            disposable.dispose()
+        }
+        finish(2)
+    }
+
+    @Test
+    fun testFlowCancelledFromWithin() = runTest {
+        val observable = flow {
+            expect(1)
+            emit(1)
+            kotlin.coroutines.coroutineContext.cancel()
+            kotlin.coroutines.coroutineContext.ensureActive()
+            expectUnreached()
+        }.asObservable()
+
+        observable.subscribe({ expect(2) }, { expectUnreached() }, { finish(3) })
+    }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/FlowableContextTest.kt b/reactive/kotlinx-coroutines-rx3/test/FlowableContextTest.kt
new file mode 100644
index 0000000..b70e0d5
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/FlowableContextTest.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.reactive.*
+import org.junit.*
+import org.junit.Test
+import kotlin.test.*
+
+class FlowableContextTest : TestBase() {
+    private val dispatcher = newSingleThreadContext("FlowableContextTest")
+
+    @After
+    fun tearDown() {
+        dispatcher.close()
+    }
+
+    @Test
+    fun testFlowableCreateAsFlowThread() = runTest {
+        expect(1)
+        val mainThread = Thread.currentThread()
+        val dispatcherThread = withContext(dispatcher) { Thread.currentThread() }
+        assertTrue(dispatcherThread != mainThread)
+        Flowable.create<String>({
+            assertEquals(dispatcherThread, Thread.currentThread())
+            it.onNext("OK")
+            it.onComplete()
+        }, BackpressureStrategy.BUFFER)
+            .asFlow()
+            .flowOn(dispatcher)
+            .collect {
+                expect(2)
+                assertEquals("OK", it)
+                assertEquals(mainThread, Thread.currentThread())
+            }
+        finish(3)
+    }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt
new file mode 100644
index 0000000..8cbd7ee
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.exceptions.*
+import kotlinx.coroutines.*
+import org.junit.*
+import org.junit.Test
+import kotlin.test.*
+
+class FlowableExceptionHandlingTest : TestBase() {
+
+    @Before
+    fun setup() {
+        ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
+    }
+
+    private inline fun <reified T : Throwable> handler(expect: Int) = { t: Throwable ->
+        assertTrue(t is UndeliverableException && t.cause is T)
+        expect(expect)
+    }
+
+    private fun cehUnreached() = CoroutineExceptionHandler { _, _ -> expectUnreached() }
+
+    @Test
+    fun testException() = withExceptionHandler({ expectUnreached() }) {
+        rxFlowable<Int>(Dispatchers.Unconfined + cehUnreached()) {
+            expect(1)
+            throw TestException()
+        }.subscribe({
+            expectUnreached()
+        }, {
+            expect(2) // Reported to onError
+        })
+        finish(3)
+    }
+
+    @Test
+    fun testFatalException() = withExceptionHandler(handler<LinkageError>(3)) {
+        rxFlowable<Int>(Dispatchers.Unconfined) {
+            expect(1)
+            throw LinkageError()
+        }.subscribe({
+            expectUnreached()
+        }, {
+            expect(2) // Fatal exception is reported to both onError and CEH
+        })
+        finish(4)
+    }
+
+    @Test
+    fun testExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) {
+        rxFlowable<Int>(Dispatchers.Unconfined + cehUnreached()) {
+            expect(1)
+            throw TestException()
+        }.publish()
+            .refCount()
+            .subscribe({
+                expectUnreached()
+            }, {
+                expect(2) // Reported to onError
+            })
+        finish(3)
+    }
+
+    @Test
+    fun testFatalExceptionAsynchronous() = withExceptionHandler(handler<LinkageError>(3)) {
+        rxFlowable<Int>(Dispatchers.Unconfined) {
+            expect(1)
+            throw LinkageError()
+        }.publish()
+            .refCount()
+            .subscribe({
+                expectUnreached()
+            }, {
+                expect(2)
+            })
+        finish(4)
+    }
+
+    @Test
+    fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) {
+        rxFlowable(Dispatchers.Unconfined) {
+            expect(1)
+            send(Unit)
+        }.subscribe({
+            expect(2)
+            throw LinkageError()
+        }, { expect(3) }) // Fatal exception is reported to both onError and CEH
+        finish(5)
+    }
+
+    @Test
+    fun testExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) {
+        rxFlowable(Dispatchers.Unconfined + cehUnreached()) {
+            expect(1)
+            send(Unit)
+        }.subscribe({
+            expect(2)
+            throw TestException()
+        }, { expect(3) }) // not reported to onError because came from the subscribe itself
+        finish(4)
+    }
+
+    @Test
+    fun testAsynchronousExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) {
+        rxFlowable(Dispatchers.Unconfined + cehUnreached()) {
+            expect(1)
+            send(Unit)
+        }.publish()
+            .refCount()
+            .subscribe({
+                expect(2)
+                throw RuntimeException()
+            }, { expect(3) })
+        finish(4)
+    }
+
+    @Test
+    fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
+        rxFlowable(Dispatchers.Unconfined) {
+            expect(1)
+            send(Unit)
+        }.publish()
+            .refCount()
+            .subscribe({
+                expect(2)
+                throw LinkageError()
+            }, { expectUnreached() })
+        finish(4)
+    }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/FlowableTest.kt b/reactive/kotlinx-coroutines-rx3/test/FlowableTest.kt
new file mode 100644
index 0000000..746d6e8
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/FlowableTest.kt
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.reactive.*
+import org.junit.Test
+import kotlin.test.*
+
+class FlowableTest : TestBase() {
+    @Test
+    fun testBasicSuccess() = runBlocking {
+        expect(1)
+        val observable = rxFlowable(currentDispatcher()) {
+            expect(4)
+            send("OK")
+        }
+        expect(2)
+        observable.subscribe { value ->
+            expect(5)
+            assertEquals("OK", value)
+        }
+        expect(3)
+        yield() // to started coroutine
+        finish(6)
+    }
+
+    @Test
+    fun testBasicFailure() = runBlocking {
+        expect(1)
+        val observable = rxFlowable<String>(currentDispatcher()) {
+            expect(4)
+            throw RuntimeException("OK")
+        }
+        expect(2)
+        observable.subscribe({
+            expectUnreached()
+        }, { error ->
+            expect(5)
+            assertTrue(error is RuntimeException)
+            assertEquals("OK", error.message)
+        })
+        expect(3)
+        yield() // to started coroutine
+        finish(6)
+    }
+
+    @Test
+    fun testBasicUnsubscribe() = runBlocking {
+        expect(1)
+        val observable = rxFlowable<String>(currentDispatcher()) {
+            expect(4)
+            yield() // back to main, will get cancelled
+            expectUnreached()
+        }
+        expect(2)
+        val sub = observable.subscribe({
+            expectUnreached()
+        }, {
+            expectUnreached()
+        })
+        expect(3)
+        yield() // to started coroutine
+        expect(5)
+        sub.dispose() // will cancel coroutine
+        yield()
+        finish(6)
+    }
+
+    @Test
+    fun testNotifyOnceOnCancellation() = runTest {
+        expect(1)
+        val observable =
+            rxFlowable(currentDispatcher()) {
+                expect(5)
+                send("OK")
+                try {
+                    delay(Long.MAX_VALUE)
+                } catch (e: CancellationException) {
+                    expect(11)
+                }
+            }
+            .doOnNext {
+                expect(6)
+                assertEquals("OK", it)
+            }
+            .doOnCancel {
+                expect(10) // notified once!
+            }
+        expect(2)
+        val job = launch(start = CoroutineStart.UNDISPATCHED) {
+            expect(3)
+            observable.collect {
+                expect(8)
+                assertEquals("OK", it)
+            }
+        }
+        expect(4)
+        yield() // to observable code
+        expect(7)
+        yield() // to consuming coroutines
+        expect(9)
+        job.cancel()
+        job.join()
+        finish(12)
+    }
+
+    @Test
+    fun testFailingConsumer() = runTest {
+        val pub = rxFlowable(currentDispatcher()) {
+            repeat(3) {
+                expect(it + 1) // expect(1), expect(2) *should* be invoked
+                send(it)
+            }
+        }
+        try {
+            pub.collect {
+                throw TestException()
+            }
+        } catch (e: TestException) {
+            finish(3)
+        }
+    }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt
new file mode 100644
index 0000000..395672c
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt
@@ -0,0 +1,136 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.consumeAsFlow
+import org.junit.Test
+import org.junit.runner.*
+import org.junit.runners.*
+import kotlin.coroutines.*
+import kotlin.test.*
+
+@RunWith(Parameterized::class)
+class IntegrationTest(
+    private val ctx: Ctx,
+    private val delay: Boolean
+) : TestBase() {
+
+    enum class Ctx {
+        MAIN        { override fun invoke(context: CoroutineContext): CoroutineContext = context.minusKey(Job) },
+        DEFAULT     { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Default },
+        UNCONFINED  { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Unconfined };
+
+        abstract operator fun invoke(context: CoroutineContext): CoroutineContext
+    }
+
+    companion object {
+        @Parameterized.Parameters(name = "ctx={0}, delay={1}")
+        @JvmStatic
+        fun params(): Collection<Array<Any>> = Ctx.values().flatMap { ctx ->
+            listOf(false, true).map { delay ->
+                arrayOf(ctx, delay)
+            }
+        }
+    }
+
+    @Test
+    fun testEmpty(): Unit = runBlocking {
+        val observable = rxObservable<String>(ctx(coroutineContext)) {
+            if (delay) delay(1)
+            // does not send anything
+        }
+        assertFailsWith<NoSuchElementException> { observable.awaitFirst() }
+        assertEquals("OK", observable.awaitFirstOrDefault("OK"))
+        assertNull(observable.awaitFirstOrNull())
+        assertEquals("ELSE", observable.awaitFirstOrElse { "ELSE" })
+        assertFailsWith<NoSuchElementException> { observable.awaitLast() }
+        assertFailsWith<NoSuchElementException> { observable.awaitSingle() }
+        var cnt = 0
+        observable.collect {
+            cnt++
+        }
+        assertEquals(0, cnt)
+    }
+
+    @Test
+    fun testSingle() = runBlocking {
+        val observable = rxObservable(ctx(coroutineContext)) {
+            if (delay) delay(1)
+            send("OK")
+        }
+        assertEquals("OK", observable.awaitFirst())
+        assertEquals("OK", observable.awaitFirstOrDefault("OK"))
+        assertEquals("OK", observable.awaitFirstOrNull())
+        assertEquals("OK", observable.awaitFirstOrElse { "ELSE" })
+        assertEquals("OK", observable.awaitLast())
+        assertEquals("OK", observable.awaitSingle())
+        var cnt = 0
+        observable.collect {
+            assertEquals("OK", it)
+            cnt++
+        }
+        assertEquals(1, cnt)
+    }
+
+    @Test
+    fun testNumbers() = runBlocking<Unit> {
+        val n = 100 * stressTestMultiplier
+        val observable = rxObservable(ctx(coroutineContext)) {
+            for (i in 1..n) {
+                send(i)
+                if (delay) delay(1)
+            }
+        }
+        assertEquals(1, observable.awaitFirst())
+        assertEquals(1, observable.awaitFirstOrDefault(0))
+        assertEquals(1, observable.awaitFirstOrNull())
+        assertEquals(1, observable.awaitFirstOrElse { 0 })
+        assertEquals(n, observable.awaitLast())
+        assertFailsWith<IllegalArgumentException> { observable.awaitSingle() }
+        checkNumbers(n, observable)
+        val channel = observable.openSubscription()
+        ctx(coroutineContext)
+        checkNumbers(n, channel.consumeAsFlow().asObservable())
+        channel.cancel()
+    }
+
+    @Test
+    fun testCancelWithoutValue() = runTest {
+        val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
+            rxObservable<String> {
+                hang {  }
+            }.awaitFirst()
+        }
+
+        job.cancel()
+        job.join()
+    }
+
+    @Test
+    fun testEmptySingle() = runTest(unhandled = listOf({e -> e is NoSuchElementException})) {
+        expect(1)
+        val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
+            rxObservable<String> {
+                yield()
+                expect(2)
+                // Nothing to emit
+            }.awaitFirst()
+        }
+
+        job.join()
+        finish(3)
+    }
+
+    private suspend fun checkNumbers(n: Int, observable: Observable<Int>) {
+        var last = 0
+        observable.collect {
+            assertEquals(++last, it)
+        }
+        assertEquals(n, last)
+    }
+
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/IterableFlowAsFlowableTckTest.kt b/reactive/kotlinx-coroutines-rx3/test/IterableFlowAsFlowableTckTest.kt
new file mode 100644
index 0000000..8e54922
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/IterableFlowAsFlowableTckTest.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import kotlinx.coroutines.flow.*
+import org.junit.*
+import org.reactivestreams.*
+import org.reactivestreams.tck.*
+
+class IterableFlowAsFlowableTckTest : PublisherVerification<Long>(TestEnvironment()) {
+
+    private fun generate(num: Long): Array<Long> {
+        return Array(if (num >= Integer.MAX_VALUE) 1000000 else num.toInt()) { it.toLong() }
+    }
+
+    override fun createPublisher(elements: Long): Flowable<Long> {
+        return generate(elements).asIterable().asFlow().asFlowable()
+    }
+
+    override fun createFailedPublisher(): Publisher<Long>? = null
+
+    @Ignore
+    override fun required_spec309_requestZeroMustSignalIllegalArgumentException() {
+    }
+
+    @Ignore
+    override fun required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() {
+    }
+
+    @Ignore
+    override fun required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() {
+        //
+    }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/LeakedExceptionTest.kt b/reactive/kotlinx-coroutines-rx3/test/LeakedExceptionTest.kt
new file mode 100644
index 0000000..028ded0
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/LeakedExceptionTest.kt
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import io.reactivex.rxjava3.exceptions.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.reactive.*
+import org.junit.Test
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
+import kotlin.test.*
+
+// Check that exception is not leaked to the global exception handler
+class LeakedExceptionTest : TestBase() {
+
+    private val handler: (Throwable) -> Unit =
+        { assertTrue { it is UndeliverableException && it.cause is TestException } }
+
+    @Test
+    fun testSingle() = withExceptionHandler(handler) {
+        withFixedThreadPool(4) { dispatcher ->
+            val flow = rxSingle<Unit>(dispatcher) { throw TestException() }.toFlowable().asFlow()
+            runBlocking {
+                repeat(10000) {
+                    combine(flow, flow) { _, _ -> Unit }
+                        .catch {}
+                        .collect {}
+                }
+            }
+        }
+    }
+
+    @Test
+    fun testObservable() = withExceptionHandler(handler) {
+        withFixedThreadPool(4) { dispatcher ->
+            val flow = rxObservable<Unit>(dispatcher) { throw TestException() }
+                .toFlowable(BackpressureStrategy.BUFFER)
+                .asFlow()
+            runBlocking {
+                repeat(10000) {
+                    combine(flow, flow) { _, _ -> Unit }
+                        .catch {}
+                        .collect {}
+                }
+            }
+        }
+    }
+
+    @Test
+    fun testFlowable() = withExceptionHandler(handler) {
+        withFixedThreadPool(4) { dispatcher ->
+            val flow = rxFlowable<Unit>(dispatcher) { throw TestException() }.asFlow()
+            runBlocking {
+                repeat(10000) {
+                    combine(flow, flow) { _, _ -> Unit }
+                        .catch {}
+                        .collect {}
+                }
+            }
+        }
+    }
+
+    /**
+     * This test doesn't test much and was added to display a problem with straighforward use of
+     * [withExceptionHandler].
+     *
+     * If one was to remove `dispatcher` and launch `rxFlowable` with an empty coroutine context,
+     * this test would fail fairly often, while other tests were also vulnerable, but the problem is
+     * much more difficult to reproduce. Thus, this test is a justification for adding `dispatcher`
+     * to other tests.
+     *
+     * See the commit that introduced this test for a better explanation.
+     */
+    @Test
+    fun testResettingExceptionHandler() = withExceptionHandler(handler) {
+        withFixedThreadPool(4) { dispatcher ->
+            val flow = rxFlowable<Unit>(dispatcher) {
+                if ((0..1).random() == 0) {
+                    Thread.sleep(100)
+                }
+                throw TestException()
+            }.asFlow()
+            runBlocking {
+                combine(flow, flow) { _, _ -> Unit }
+                    .catch {}
+                    .collect {}
+            }
+        }
+    }
+
+    /**
+     * Run in a thread pool, then wait for all the tasks to finish.
+     */
+    private fun withFixedThreadPool(numberOfThreads: Int, block: (CoroutineDispatcher) -> Unit) {
+        val pool = Executors.newFixedThreadPool(numberOfThreads)
+        val dispatcher = pool.asCoroutineDispatcher()
+        block(dispatcher)
+        pool.shutdown()
+        while (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+            /* deliberately empty */
+        }
+    }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt b/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt
new file mode 100644
index 0000000..e0cec74
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt
@@ -0,0 +1,316 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import io.reactivex.rxjava3.disposables.*
+import io.reactivex.rxjava3.exceptions.*
+import io.reactivex.rxjava3.functions.*
+import io.reactivex.rxjava3.internal.functions.Functions.*
+import kotlinx.coroutines.*
+import org.junit.*
+import org.junit.Test
+import java.util.concurrent.*
+import java.util.concurrent.CancellationException
+import kotlin.test.*
+
+class MaybeTest : TestBase() {
+    @Before
+    fun setup() {
+        ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
+    }
+
+    @Test
+    fun testBasicSuccess() = runBlocking {
+        expect(1)
+        val maybe = rxMaybe(currentDispatcher()) {
+            expect(4)
+            "OK"
+        }
+        expect(2)
+        maybe.subscribe { value ->
+            expect(5)
+            assertEquals("OK", value)
+        }
+        expect(3)
+        yield() // to started coroutine
+        finish(6)
+    }
+
+    @Test
+    fun testBasicEmpty() = runBlocking {
+        expect(1)
+        val maybe = rxMaybe(currentDispatcher()) {
+            expect(4)
+            null
+        }
+        expect(2)
+        maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, Action {
+            expect(5)
+        })
+        expect(3)
+        yield() // to started coroutine
+        finish(6)
+    }
+
+    @Test
+    fun testBasicFailure() = runBlocking {
+        expect(1)
+        val maybe = rxMaybe(currentDispatcher()) {
+            expect(4)
+            throw RuntimeException("OK")
+        }
+        expect(2)
+        maybe.subscribe({
+            expectUnreached()
+        }, { error ->
+            expect(5)
+            assertTrue(error is RuntimeException)
+            assertEquals("OK", error.message)
+        })
+        expect(3)
+        yield() // to started coroutine
+        finish(6)
+    }
+
+
+    @Test
+    fun testBasicUnsubscribe() = runBlocking {
+        expect(1)
+        val maybe = rxMaybe(currentDispatcher()) {
+            expect(4)
+            yield() // back to main, will get cancelled
+            expectUnreached()
+        }
+        expect(2)
+        // nothing is called on a disposed rx3 maybe
+        val sub = maybe.subscribe({
+            expectUnreached()
+        }, {
+            expectUnreached()
+        })
+        expect(3)
+        yield() // to started coroutine
+        expect(5)
+        sub.dispose() // will cancel coroutine
+        yield()
+        finish(6)
+    }
+
+    @Test
+    fun testMaybeNoWait() {
+        val maybe = rxMaybe {
+            "OK"
+        }
+
+        checkMaybeValue(maybe) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testMaybeAwait() = runBlocking {
+        assertEquals("OK", Maybe.just("O").await() + "K")
+    }
+
+    @Test
+    fun testMaybeAwaitForNull() = runBlocking {
+        assertNull(Maybe.empty<String>().await())
+    }
+
+    @Test
+    fun testMaybeEmitAndAwait() {
+        val maybe = rxMaybe {
+            Maybe.just("O").await() + "K"
+        }
+
+        checkMaybeValue(maybe) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testMaybeWithDelay() {
+        val maybe = rxMaybe {
+            Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K"
+        }
+
+        checkMaybeValue(maybe) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testMaybeException() {
+        val maybe = rxMaybe {
+            Observable.just("O", "K").awaitSingle() + "K"
+        }
+
+        checkErroneous(maybe) {
+            assert(it is IllegalArgumentException)
+        }
+    }
+
+    @Test
+    fun testAwaitFirst() {
+        val maybe = rxMaybe {
+            Observable.just("O", "#").awaitFirst() + "K"
+        }
+
+        checkMaybeValue(maybe) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testAwaitLast() {
+        val maybe = rxMaybe {
+            Observable.just("#", "O").awaitLast() + "K"
+        }
+
+        checkMaybeValue(maybe) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testExceptionFromObservable() {
+        val maybe = rxMaybe {
+            try {
+                Observable.error<String>(RuntimeException("O")).awaitFirst()
+            } catch (e: RuntimeException) {
+                Observable.just(e.message!!).awaitLast() + "K"
+            }
+        }
+
+        checkMaybeValue(maybe) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testExceptionFromCoroutine() {
+        val maybe = rxMaybe<String> {
+            throw IllegalStateException(Observable.just("O").awaitSingle() + "K")
+        }
+
+        checkErroneous(maybe) {
+            assert(it is IllegalStateException)
+            assertEquals("OK", it.message)
+        }
+    }
+
+    @Test
+    fun testCancelledConsumer() = runTest {
+        expect(1)
+        val maybe = rxMaybe<Int>(currentDispatcher()) {
+            expect(4)
+            try {
+                delay(Long.MAX_VALUE)
+            } catch (e: CancellationException) {
+                expect(6)
+            }
+            42
+        }
+        expect(2)
+        val timeout = withTimeoutOrNull(100) {
+            expect(3)
+            maybe.collect {
+                expectUnreached()
+            }
+            expectUnreached()
+        }
+        assertNull(timeout)
+        expect(5)
+        yield() // must cancel code inside maybe!!!
+        finish(7)
+    }
+
+    @Test
+    fun testSuppressedException() = runTest {
+        val maybe = rxMaybe(currentDispatcher()) {
+            launch(start = CoroutineStart.ATOMIC) {
+                throw TestException() // child coroutine fails
+            }
+            try {
+                delay(Long.MAX_VALUE)
+            } finally {
+                throw TestException2() // but parent throws another exception while cleaning up
+            }
+        }
+        try {
+            maybe.await()
+            expectUnreached()
+        } catch (e: TestException) {
+            assertTrue(e.suppressed[0] is TestException2)
+        }
+    }
+
+    @Test
+    fun testUnhandledException() = runTest {
+        expect(1)
+        var disposable: Disposable? = null
+        val handler = { e: Throwable ->
+            assertTrue(e is UndeliverableException && e.cause is TestException)
+            expect(5)
+        }
+        val maybe = rxMaybe(currentDispatcher()) {
+            expect(4)
+            disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled
+            try {
+                delay(Long.MAX_VALUE)
+            } finally {
+                throw TestException() // would not be able to handle it since mono is disposed
+            }
+        }
+        withExceptionHandler(handler) {
+            maybe.subscribe(object : MaybeObserver<Unit> {
+                override fun onSubscribe(d: Disposable) {
+                    expect(2)
+                    disposable = d
+                }
+
+                override fun onComplete() {
+                    expectUnreached()
+                }
+
+                override fun onSuccess(t: Unit) {
+                    expectUnreached()
+                }
+
+                override fun onError(t: Throwable) {
+                    expectUnreached()
+                }
+            })
+            expect(3)
+            yield() // run coroutine
+            finish(6)
+        }
+    }
+
+    @Test
+    fun testFatalExceptionInSubscribe() = runTest {
+        val handler = { e: Throwable ->
+            assertTrue(e is UndeliverableException && e.cause is LinkageError)
+            expect(2)
+        }
+
+        withExceptionHandler(handler) {
+            rxMaybe(Dispatchers.Unconfined) {
+                expect(1)
+                42
+            }.subscribe({ throw LinkageError() })
+            finish(3)
+        }
+    }
+
+    @Test
+    fun testFatalExceptionInSingle() = runTest {
+        rxMaybe(Dispatchers.Unconfined) {
+            throw LinkageError()
+        }.subscribe({ expectUnreached()  }, { expect(1); assertTrue(it is LinkageError) })
+        finish(2)
+    }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableAsFlowTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableAsFlowTest.kt
new file mode 100644
index 0000000..262da9a
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableAsFlowTest.kt
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.Observable
+import io.reactivex.rxjava3.core.ObservableSource
+import io.reactivex.rxjava3.core.Observer
+import io.reactivex.rxjava3.disposables.Disposable
+import io.reactivex.rxjava3.subjects.PublishSubject
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import kotlin.test.*
+
+class ObservableAsFlowTest : TestBase() {
+    @Test
+    fun testCancellation() = runTest {
+        var onNext = 0
+        var onCancelled = 0
+        var onError = 0
+
+        val source = rxObservable(currentDispatcher()) {
+            coroutineContext[Job]?.invokeOnCompletion {
+                if (it is CancellationException) ++onCancelled
+            }
+
+            repeat(100) {
+                send(it)
+            }
+        }
+
+        source.asFlow().launchIn(CoroutineScope(Dispatchers.Unconfined)) {
+            onEach {
+                ++onNext
+                throw RuntimeException()
+            }
+            catch<Throwable> {
+                ++onError
+            }
+        }.join()
+
+
+        assertEquals(1, onNext)
+        assertEquals(1, onError)
+        assertEquals(1, onCancelled)
+    }
+
+    @Test
+    fun testImmediateCollection() {
+        val source = PublishSubject.create<Int>()
+        val flow = source.asFlow()
+        GlobalScope.launch(Dispatchers.Unconfined) {
+            expect(1)
+            flow.collect { expect(it) }
+            expect(6)
+        }
+        expect(2)
+        source.onNext(3)
+        expect(4)
+        source.onNext(5)
+        source.onComplete()
+        finish(7)
+    }
+
+    @Test
+    fun testOnErrorCancellation() {
+        val source = PublishSubject.create<Int>()
+        val flow = source.asFlow()
+        val exception = RuntimeException()
+        GlobalScope.launch(Dispatchers.Unconfined) {
+            try {
+                expect(1)
+                flow.collect { expect(it) }
+                expectUnreached()
+            }
+            catch (e: Exception) {
+                assertSame(exception, e.cause)
+                expect(5)
+            }
+            expect(6)
+        }
+        expect(2)
+        source.onNext(3)
+        expect(4)
+        source.onError(exception)
+        finish(7)
+    }
+
+    @Test
+    fun testUnsubscribeOnCollectionException() {
+        val source = PublishSubject.create<Int>()
+        val flow = source.asFlow()
+        val exception = RuntimeException()
+        GlobalScope.launch(Dispatchers.Unconfined) {
+            try {
+                expect(1)
+                flow.collect {
+                    expect(it)
+                    if (it == 3) throw exception
+                }
+                expectUnreached()
+            }
+            catch (e: Exception) {
+                assertSame(exception, e.cause)
+                expect(4)
+            }
+            expect(5)
+        }
+        expect(2)
+        assertTrue(source.hasObservers())
+        source.onNext(3)
+        assertFalse(source.hasObservers())
+        finish(6)
+    }
+
+    @Test
+    fun testLateOnSubscribe() {
+        var observer: Observer<in Int>? = null
+        val source = ObservableSource<Int> { observer = it }
+        val flow = source.asFlow()
+        assertNull(observer)
+        val job = GlobalScope.launch(Dispatchers.Unconfined) {
+            expect(1)
+            flow.collect { expectUnreached() }
+            expectUnreached()
+        }
+        expect(2)
+        assertNotNull(observer)
+        job.cancel()
+        val disposable = Disposable.empty()
+        observer!!.onSubscribe(disposable)
+        assertTrue(disposable.isDisposed)
+        finish(3)
+    }
+
+    @Test
+    fun testBufferUnlimited() = runTest {
+        val source = rxObservable(currentDispatcher()) {
+            expect(1); send(10)
+            expect(2); send(11)
+            expect(3); send(12)
+            expect(4); send(13)
+            expect(5); send(14)
+            expect(6); send(15)
+            expect(7); send(16)
+            expect(8); send(17)
+            expect(9)
+        }
+        source.asFlow().buffer(Channel.UNLIMITED).collect { expect(it) }
+        finish(18)
+    }
+
+    @Test
+    fun testConflated() = runTest {
+        val source = Observable.range(1, 5)
+        val list = source.asFlow().conflate().toList()
+        assertEquals(listOf(1, 5), list)
+    }
+
+    @Test
+    fun testLongRange() = runTest {
+        val source = Observable.range(1, 10_000)
+        val count = source.asFlow().count()
+        assertEquals(10_000, count)
+    }
+
+    @Test
+    fun testProduce() = runTest {
+        val source = Observable.range(0, 10)
+        val flow = source.asFlow()
+        check((0..9).toList(), flow.produceIn(this))
+        check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this))
+        check((0..9).toList(), flow.buffer(2).produceIn(this))
+        check((0..9).toList(), flow.buffer(0).produceIn(this))
+        check(listOf(0, 9), flow.conflate().produceIn(this))
+    }
+
+    private suspend fun check(expected: List<Int>, channel: ReceiveChannel<Int>) {
+        val result = ArrayList<Int>(10)
+        channel.consumeEach { result.add(it) }
+        assertEquals(expected, result)
+    }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableCompletionStressTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableCompletionStressTest.kt
new file mode 100644
index 0000000..c1d25bc
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableCompletionStressTest.kt
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import kotlinx.coroutines.*
+import org.junit.*
+import java.util.*
+import kotlin.coroutines.*
+
+class ObservableCompletionStressTest : TestBase() {
+    private val N_REPEATS = 10_000 * stressTestMultiplier
+
+    private fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = rxObservable(context) {
+        for (x in start until start + count) send(x)
+    }
+
+    @Test
+    fun testCompletion() {
+        val rnd = Random()
+        repeat(N_REPEATS) {
+            val count = rnd.nextInt(5)
+            runBlocking {
+                withTimeout(5000) {
+                    var received = 0
+                    range(Dispatchers.Default, 1, count).collect { x ->
+                        received++
+                        if (x != received) error("$x != $received")
+                    }
+                    if (received != count) error("$received != $count")
+                }
+            }
+        }
+    }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt
new file mode 100644
index 0000000..1183b2a
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.exceptions.*
+import kotlinx.coroutines.*
+import org.junit.*
+import org.junit.Test
+import kotlin.test.*
+
+class ObservableExceptionHandlingTest : TestBase() {
+
+    @Before
+    fun setup() {
+        ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
+    }
+
+    private inline fun <reified T : Throwable> handler(expect: Int) = { t: Throwable ->
+        assertTrue(t is UndeliverableException && t.cause is T)
+        expect(expect)
+    }
+
+    private fun cehUnreached() = CoroutineExceptionHandler { _, _ -> expectUnreached() }
+
+    @Test
+    fun testException() = withExceptionHandler({ expectUnreached() }) {
+        rxObservable<Int>(Dispatchers.Unconfined + cehUnreached()) {
+            expect(1)
+            throw TestException()
+        }.subscribe({
+            expectUnreached()
+        }, {
+            expect(2) // Reported to onError
+        })
+        finish(3)
+    }
+
+    @Test
+    fun testFatalException() = withExceptionHandler(handler<LinkageError>(3)) {
+        rxObservable<Int>(Dispatchers.Unconfined) {
+            expect(1)
+            throw LinkageError()
+        }.subscribe({
+            expectUnreached()
+        }, {
+            expect(2)
+        })
+        finish(4)
+    }
+
+    @Test
+    fun testExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) {
+        rxObservable<Int>(Dispatchers.Unconfined) {
+            expect(1)
+            throw TestException()
+        }.publish()
+            .refCount()
+            .subscribe({
+                expectUnreached()
+            }, {
+                expect(2) // Reported to onError
+            })
+        finish(3)
+    }
+
+    @Test
+    fun testFatalExceptionAsynchronous() = withExceptionHandler(handler<LinkageError>(3)) {
+        rxObservable<Int>(Dispatchers.Unconfined) {
+            expect(1)
+            throw LinkageError()
+        }.publish()
+            .refCount()
+            .subscribe({
+                expectUnreached()
+            }, {
+                expect(2) // Fatal exception is not reported in onError
+            })
+        finish(4)
+    }
+
+    @Test
+    fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) {
+        rxObservable(Dispatchers.Unconfined) {
+            expect(1)
+            send(Unit)
+        }.subscribe({
+            expect(2)
+            throw LinkageError()
+        }, { expect(3) }) // Unreached because fatal errors are rethrown
+        finish(5)
+    }
+
+    @Test
+    fun testExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) {
+        rxObservable(Dispatchers.Unconfined) {
+            expect(1)
+            send(Unit)
+        }.subscribe({
+            expect(2)
+            throw TestException()
+        }, { expect(3) }) // not reported to onError because came from the subscribe itself
+        finish(4)
+    }
+
+    @Test
+    fun testAsynchronousExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) {
+        rxObservable(Dispatchers.Unconfined) {
+            expect(1)
+            send(Unit)
+        }.publish()
+            .refCount()
+            .subscribe({
+                expect(2)
+                throw RuntimeException()
+            }, { expect(3) })
+        finish(4)
+    }
+
+    @Test
+    fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) {
+        rxObservable(Dispatchers.Unconfined) {
+            expect(1)
+            send(Unit)
+        }.publish()
+            .refCount()
+            .subscribe({
+                expect(2)
+                throw LinkageError()
+            }, { expect(3) })
+        finish(5)
+    }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableMultiTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableMultiTest.kt
new file mode 100644
index 0000000..b4adf7a
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableMultiTest.kt
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import kotlinx.coroutines.*
+import org.junit.Test
+import java.io.*
+import kotlin.test.*
+
+/**
+ * Test emitting multiple values with [rxObservable].
+ */
+class ObservableMultiTest : TestBase() {
+    @Test
+    fun testNumbers() {
+        val n = 100 * stressTestMultiplier
+        val observable = rxObservable {
+            repeat(n) { send(it) }
+        }
+        checkSingleValue(observable.toList()) { list ->
+            assertEquals((0 until n).toList(), list)
+        }
+    }
+
+
+    @Test
+    fun testConcurrentStress() {
+        val n = 10_000 * stressTestMultiplier
+        val observable = rxObservable {
+            newCoroutineContext(coroutineContext)
+            // concurrent emitters (many coroutines)
+            val jobs = List(n) {
+                // launch
+                launch {
+                    val i = it
+                    send(i)
+                }
+            }
+            jobs.forEach { it.join() }
+        }
+        checkSingleValue(observable.toList()) { list ->
+            assertEquals(n, list.size)
+            assertEquals((0 until n).toList(), list.sorted())
+        }
+    }
+
+    @Test
+    fun testIteratorResendUnconfined() {
+        val n = 10_000 * stressTestMultiplier
+        val observable = rxObservable(Dispatchers.Unconfined) {
+            Observable.range(0, n).collect { send(it) }
+        }
+        checkSingleValue(observable.toList()) { list ->
+            assertEquals((0 until n).toList(), list)
+        }
+    }
+
+    @Test
+    fun testIteratorResendPool() {
+        val n = 10_000 * stressTestMultiplier
+        val observable = rxObservable {
+            Observable.range(0, n).collect { send(it) }
+        }
+        checkSingleValue(observable.toList()) { list ->
+            assertEquals((0 until n).toList(), list)
+        }
+    }
+
+    @Test
+    fun testSendAndCrash() {
+        val observable = rxObservable {
+            send("O")
+            throw IOException("K")
+        }
+        val single = rxSingle {
+            var result = ""
+            try {
+                observable.collect { result += it }
+            } catch(e: IOException) {
+                result += e.message
+            }
+            result
+        }
+        checkSingleValue(single) {
+            assertEquals("OK", it)
+        }
+    }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt
new file mode 100644
index 0000000..2a3ce04
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import kotlinx.coroutines.*
+import org.junit.*
+import org.junit.Test
+import java.util.concurrent.*
+import kotlin.test.*
+
+class ObservableSingleTest : TestBase() {
+    @Before
+    fun setup() {
+        ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
+    }
+
+    @Test
+    fun testSingleNoWait() {
+        val observable = rxObservable {
+            send("OK")
+        }
+
+        checkSingleValue(observable) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testSingleAwait() = runBlocking {
+        assertEquals("OK", Observable.just("O").awaitSingle() + "K")
+    }
+
+    @Test
+    fun testSingleEmitAndAwait() {
+        val observable = rxObservable {
+            send(Observable.just("O").awaitSingle() + "K")
+        }
+
+        checkSingleValue(observable) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testSingleWithDelay() {
+        val observable = rxObservable {
+            send(Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K")
+        }
+
+        checkSingleValue(observable) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testSingleException() {
+        val observable = rxObservable {
+            send(Observable.just("O", "K").awaitSingle() + "K")
+        }
+
+        checkErroneous(observable) {
+            assertTrue(it is IllegalArgumentException)
+        }
+    }
+
+    @Test
+    fun testAwaitFirst() {
+        val observable = rxObservable {
+            send(Observable.just("O", "#").awaitFirst() + "K")
+        }
+
+        checkSingleValue(observable) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testAwaitFirstOrDefault() {
+        val observable = rxObservable {
+            send(Observable.empty<String>().awaitFirstOrDefault("O") + "K")
+        }
+
+        checkSingleValue(observable) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testAwaitFirstOrDefaultWithValues() {
+        val observable = rxObservable {
+            send(Observable.just("O", "#").awaitFirstOrDefault("!") + "K")
+        }
+
+        checkSingleValue(observable) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testAwaitFirstOrNull() {
+        val observable = rxObservable<String> {
+            send(Observable.empty<String>().awaitFirstOrNull() ?: "OK")
+        }
+
+        checkSingleValue(observable) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testAwaitFirstOrNullWithValues() {
+        val observable = rxObservable {
+            send((Observable.just("O", "#").awaitFirstOrNull() ?: "!") + "K")
+        }
+
+        checkSingleValue(observable) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testAwaitFirstOrElse() {
+        val observable = rxObservable {
+            send(Observable.empty<String>().awaitFirstOrElse { "O" } + "K")
+        }
+
+        checkSingleValue(observable) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testAwaitFirstOrElseWithValues() {
+        val observable = rxObservable {
+            send(Observable.just("O", "#").awaitFirstOrElse { "!" } + "K")
+        }
+
+        checkSingleValue(observable) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testAwaitLast() {
+        val observable = rxObservable {
+            send(Observable.just("#", "O").awaitLast() + "K")
+        }
+
+        checkSingleValue(observable) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testExceptionFromObservable() {
+        val observable = rxObservable {
+            try {
+                send(Observable.error<String>(RuntimeException("O")).awaitFirst())
+            } catch (e: RuntimeException) {
+                send(Observable.just(e.message!!).awaitLast() + "K")
+            }
+        }
+
+        checkSingleValue(observable) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testExceptionFromCoroutine() {
+        val observable = rxObservable<String> {
+            throw IllegalStateException(Observable.just("O").awaitSingle() + "K")
+        }
+
+        checkErroneous(observable) {
+            assertTrue(it is IllegalStateException)
+            assertEquals("OK", it.message)
+        }
+    }
+
+    @Test
+    fun testObservableIteration() {
+        val observable = rxObservable {
+            var result = ""
+            Observable.just("O", "K").collect { result += it }
+            send(result)
+        }
+
+        checkSingleValue(observable) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testObservableIterationFailure() {
+        val observable = rxObservable {
+            try {
+                Observable.error<String>(RuntimeException("OK")).collect { fail("Should not be here") }
+                send("Fail")
+            } catch (e: RuntimeException) {
+                send(e.message!!)
+            }
+        }
+
+        checkSingleValue(observable) {
+            assertEquals("OK", it)
+        }
+    }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableSubscriptionSelectTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableSubscriptionSelectTest.kt
new file mode 100644
index 0000000..2f04316
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableSubscriptionSelectTest.kt
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.selects.*
+import org.junit.Test
+import kotlin.test.*
+
+class ObservableSubscriptionSelectTest : TestBase() {
+    @Test
+    fun testSelect() = runTest {
+        // source with n ints
+        val n = 1000 * stressTestMultiplier
+        val source = rxObservable { repeat(n) { send(it) } }
+        var a = 0
+        var b = 0
+        // open two subs
+        val channelA = source.openSubscription()
+        val channelB = source.openSubscription()
+        loop@ while (true) {
+            val done: Int = select {
+                channelA.onReceiveOrNull {
+                    if (it != null) assertEquals(a++, it)
+                    if (it == null) 0 else 1
+                }
+                channelB.onReceiveOrNull {
+                    if (it != null) assertEquals(b++, it)
+                    if (it == null) 0 else 2
+                }
+            }
+            when (done) {
+                0 -> break@loop
+                1 -> {
+                    val r = channelB.receiveOrNull()
+                    if (r != null) assertEquals(b++, r)
+                }
+                2 -> {
+                    val r = channelA.receiveOrNull()
+                    if (r != null) assertEquals(a++, r)
+                }
+            }
+        }
+        channelA.cancel()
+        channelB.cancel()
+        // should receive one of them fully
+        assertTrue(a == n || b == n)
+    }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableTest.kt
new file mode 100644
index 0000000..c6a6be5
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableTest.kt
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import io.reactivex.rxjava3.plugins.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.CancellationException
+import org.junit.*
+import org.junit.Test
+import java.util.concurrent.*
+import kotlin.test.*
+
+class ObservableTest : TestBase() {
+    @Before
+    fun setup() {
+        ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
+    }
+
+    @Test
+    fun testBasicSuccess() = runBlocking {
+        expect(1)
+        val observable = rxObservable(currentDispatcher()) {
+            expect(4)
+            send("OK")
+        }
+        expect(2)
+        observable.subscribe { value ->
+            expect(5)
+            assertEquals("OK", value)
+        }
+        expect(3)
+        yield() // to started coroutine
+        finish(6)
+    }
+
+    @Test
+    fun testBasicFailure() = runBlocking {
+        expect(1)
+        val observable = rxObservable<String>(currentDispatcher()) {
+            expect(4)
+            throw RuntimeException("OK")
+        }
+        expect(2)
+        observable.subscribe({
+            expectUnreached()
+        }, { error ->
+            expect(5)
+            assertTrue(error is RuntimeException)
+            assertEquals("OK", error.message)
+        })
+        expect(3)
+        yield() // to started coroutine
+        finish(6)
+    }
+
+    @Test
+    fun testBasicUnsubscribe() = runBlocking {
+        expect(1)
+        val observable = rxObservable<String>(currentDispatcher()) {
+            expect(4)
+            yield() // back to main, will get cancelled
+            expectUnreached()
+        }
+        expect(2)
+        val sub = observable.subscribe({
+            expectUnreached()
+        }, {
+            expectUnreached()
+        })
+        expect(3)
+        yield() // to started coroutine
+        expect(5)
+        sub.dispose() // will cancel coroutine
+        yield()
+        finish(6)
+    }
+
+    @Test
+    fun testNotifyOnceOnCancellation() = runTest {
+        expect(1)
+        val observable =
+            rxObservable(currentDispatcher()) {
+                expect(5)
+                send("OK")
+                try {
+                    delay(Long.MAX_VALUE)
+                } catch (e: CancellationException) {
+                    expect(11)
+                }
+            }
+            .doOnNext {
+                expect(6)
+                assertEquals("OK", it)
+            }
+            .doOnDispose {
+                expect(10) // notified once!
+            }
+        expect(2)
+        val job = launch(start = CoroutineStart.UNDISPATCHED) {
+            expect(3)
+            observable.collect {
+                expect(8)
+                assertEquals("OK", it)
+            }
+        }
+        expect(4)
+        yield() // to observable code
+        expect(7)
+        yield() // to consuming coroutines
+        expect(9)
+        job.cancel()
+        job.join()
+        finish(12)
+    }
+
+    @Test
+    fun testFailingConsumer() = runTest {
+        expect(1)
+        val pub = rxObservable(currentDispatcher()) {
+            expect(2)
+            send("OK")
+            try {
+                delay(Long.MAX_VALUE)
+            } catch (e: CancellationException) {
+                finish(5)
+            }
+        }
+        try {
+            pub.collect {
+                expect(3)
+                throw TestException()
+            }
+        } catch (e: TestException) {
+            expect(4)
+        }
+    }
+
+    @Test
+    fun testExceptionAfterCancellation() {
+        // Test that no exceptions were reported to the global EH (it will fail the test if so)
+        val handler = { e: Throwable ->
+            assertFalse(e is CancellationException)
+        }
+        withExceptionHandler(handler) {
+            RxJavaPlugins.setErrorHandler {
+                require(it !is CancellationException)
+            }
+            Observable
+                .interval(1, TimeUnit.MILLISECONDS)
+                .take(1000)
+                .switchMapSingle {
+                    rxSingle {
+                        timeBomb().await()
+                    }
+                }
+                .blockingSubscribe({}, {})
+        }
+    }
+
+    private fun timeBomb() = Single.timer(1, TimeUnit.MILLISECONDS).doOnSuccess { throw TestException() }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/SchedulerTest.kt b/reactive/kotlinx-coroutines-rx3/test/SchedulerTest.kt
new file mode 100644
index 0000000..9e95c21
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/SchedulerTest.kt
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.schedulers.Schedulers
+import kotlinx.coroutines.*
+import org.junit.Before
+import org.junit.Test
+import kotlin.test.*
+
+class SchedulerTest : TestBase() {
+    @Before
+    fun setup() {
+        ignoreLostThreads("RxCachedThreadScheduler-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
+    }
+
+    @Test
+    fun testIoScheduler(): Unit = runBlocking {
+        expect(1)
+        val mainThread = Thread.currentThread()
+        withContext(Schedulers.io().asCoroutineDispatcher()) {
+            val t1 = Thread.currentThread()
+            assertNotSame(t1, mainThread)
+            expect(2)
+            delay(100)
+            val t2 = Thread.currentThread()
+            assertNotSame(t2, mainThread)
+            expect(3)
+        }
+        finish(4)
+    }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/SingleTest.kt b/reactive/kotlinx-coroutines-rx3/test/SingleTest.kt
new file mode 100644
index 0000000..46bcaf8
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/SingleTest.kt
@@ -0,0 +1,266 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import io.reactivex.rxjava3.disposables.*
+import io.reactivex.rxjava3.exceptions.*
+import io.reactivex.rxjava3.functions.*
+import kotlinx.coroutines.*
+import org.junit.*
+import org.junit.Test
+import java.util.concurrent.*
+import kotlin.test.*
+
+class SingleTest : TestBase() {
+    @Before
+    fun setup() {
+        ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
+    }
+
+    @Test
+    fun testBasicSuccess() = runBlocking {
+        expect(1)
+        val single = rxSingle(currentDispatcher()) {
+            expect(4)
+            "OK"
+        }
+        expect(2)
+        single.subscribe { value ->
+            expect(5)
+            assertEquals("OK", value)
+        }
+        expect(3)
+        yield() // to started coroutine
+        finish(6)
+    }
+
+    @Test
+    fun testBasicFailure() = runBlocking {
+        expect(1)
+        val single = rxSingle(currentDispatcher()) {
+            expect(4)
+            throw RuntimeException("OK")
+        }
+        expect(2)
+        single.subscribe({
+            expectUnreached()
+        }, { error ->
+            expect(5)
+            assertTrue(error is RuntimeException)
+            assertEquals("OK", error.message)
+        })
+        expect(3)
+        yield() // to started coroutine
+        finish(6)
+    }
+
+
+    @Test
+    fun testBasicUnsubscribe() = runBlocking {
+        expect(1)
+        val single = rxSingle(currentDispatcher()) {
+            expect(4)
+            yield() // back to main, will get cancelled
+            expectUnreached()
+
+        }
+        expect(2)
+        // nothing is called on a disposed rx3 single
+        val sub = single.subscribe({
+            expectUnreached()
+        }, {
+            expectUnreached()
+        })
+        expect(3)
+        yield() // to started coroutine
+        expect(5)
+        sub.dispose() // will cancel coroutine
+        yield()
+        finish(6)
+    }
+
+    @Test
+    fun testSingleNoWait() {
+        val single = rxSingle {
+            "OK"
+        }
+
+        checkSingleValue(single) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testSingleAwait() = runBlocking {
+        assertEquals("OK", Single.just("O").await() + "K")
+    }
+
+    @Test
+    fun testSingleEmitAndAwait() {
+        val single = rxSingle {
+            Single.just("O").await() + "K"
+        }
+
+        checkSingleValue(single) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testSingleWithDelay() {
+        val single = rxSingle {
+            Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K"
+        }
+
+        checkSingleValue(single) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testSingleException() {
+        val single = rxSingle {
+            Observable.just("O", "K").awaitSingle() + "K"
+        }
+
+        checkErroneous(single) {
+            assert(it is IllegalArgumentException)
+        }
+    }
+
+    @Test
+    fun testAwaitFirst() {
+        val single = rxSingle {
+            Observable.just("O", "#").awaitFirst() + "K"
+        }
+
+        checkSingleValue(single) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testAwaitLast() {
+        val single = rxSingle {
+            Observable.just("#", "O").awaitLast() + "K"
+        }
+
+        checkSingleValue(single) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testExceptionFromObservable() {
+        val single = rxSingle {
+            try {
+                Observable.error<String>(RuntimeException("O")).awaitFirst()
+            } catch (e: RuntimeException) {
+                Observable.just(e.message!!).awaitLast() + "K"
+            }
+        }
+
+        checkSingleValue(single) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testExceptionFromCoroutine() {
+        val single = rxSingle<String> {
+            throw IllegalStateException(Observable.just("O").awaitSingle() + "K")
+        }
+
+        checkErroneous(single) {
+            assert(it is IllegalStateException)
+            assertEquals("OK", it.message)
+        }
+    }
+
+    @Test
+    fun testSuppressedException() = runTest {
+        val single = rxSingle(currentDispatcher()) {
+            launch(start = CoroutineStart.ATOMIC) {
+                throw TestException() // child coroutine fails
+            }
+            try {
+                delay(Long.MAX_VALUE)
+            } finally {
+                throw TestException2() // but parent throws another exception while cleaning up
+            }
+        }
+        try {
+            single.await()
+            expectUnreached()
+        } catch (e: TestException) {
+            assertTrue(e.suppressed[0] is TestException2)
+        }
+    }
+
+    @Test
+    fun testFatalExceptionInSubscribe() = runTest {
+        val handler = { e: Throwable ->
+            assertTrue(e is UndeliverableException && e.cause is LinkageError)
+            expect(2)
+        }
+        withExceptionHandler(handler) {
+            rxSingle(Dispatchers.Unconfined) {
+                expect(1)
+                42
+            }.subscribe(Consumer {
+                throw LinkageError()
+            })
+            finish(3)
+        }
+    }
+
+    @Test
+    fun testFatalExceptionInSingle() = runTest {
+        rxSingle(Dispatchers.Unconfined) {
+            throw LinkageError()
+        }.subscribe({ _, e ->  assertTrue(e is LinkageError); expect(1) })
+
+        finish(2)
+    }
+
+    @Test
+    fun testUnhandledException() = runTest {
+        expect(1)
+        var disposable: Disposable? = null
+        val handler = { e: Throwable ->
+            assertTrue(e is UndeliverableException && e.cause is TestException)
+            expect(5)
+        }
+        val single = rxSingle(currentDispatcher()) {
+            expect(4)
+            disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled
+            try {
+                delay(Long.MAX_VALUE)
+            } finally {
+                throw TestException() // would not be able to handle it since mono is disposed
+            }
+        }
+        withExceptionHandler(handler) {
+            single.subscribe(object : SingleObserver<Unit> {
+                override fun onSubscribe(d: Disposable) {
+                    expect(2)
+                    disposable = d
+                }
+
+                override fun onSuccess(t: Unit) {
+                    expectUnreached()
+                }
+
+                override fun onError(t: Throwable) {
+                    expectUnreached()
+                }
+            })
+            expect(3)
+            yield() // run coroutine
+            finish(6)
+        }
+    }
+}
diff --git a/settings.gradle b/settings.gradle
index 95fcd7c..759628f 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -34,6 +34,7 @@
 module('reactive/kotlinx-coroutines-reactor')
 module('reactive/kotlinx-coroutines-jdk9')
 module('reactive/kotlinx-coroutines-rx2')
+module('reactive/kotlinx-coroutines-rx3')
 module('ui/kotlinx-coroutines-android')
 module('ui/kotlinx-coroutines-android/android-unit-tests')
 module('ui/kotlinx-coroutines-javafx')