Integration with RxJava3 (#1950)
Fixes #1883
Co-authored-by: Zac Sweers <zsweers@slack-corp.com>
diff --git a/README.md b/README.md
index 62a7dc8..d829dfe 100644
--- a/README.md
+++ b/README.md
@@ -47,6 +47,7 @@
* Reactive Streams ([Publisher.collect], [Publisher.awaitSingle], [publish], etc),
* Flow (JDK 9) (the same interface as for Reactive Streams),
* RxJava 2.x ([rxFlowable], [rxSingle], etc), and
+ * RxJava 3.x ([rxFlowable], [rxSingle], etc), and
* Project Reactor ([flux], [mono], etc).
* [ui](ui/README.md) — modules that provide coroutine dispatchers for various single-threaded UI libraries:
* Android, JavaFX, and Swing.
@@ -288,6 +289,8 @@
<!--- INDEX kotlinx.coroutines.rx2 -->
[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html
[rxSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-single.html
+<!--- MODULE kotlinx-coroutines-rx2 -->
+<!--- INDEX kotlinx.coroutines.rx2 -->
<!--- MODULE kotlinx-coroutines-reactor -->
<!--- INDEX kotlinx.coroutines.reactor -->
[flux]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/flux.html
diff --git a/docs/flow.md b/docs/flow.md
index 705f338..37e491c 100644
--- a/docs/flow.md
+++ b/docs/flow.md
@@ -1786,7 +1786,7 @@
be Kotlin and suspension friendly and respect structured concurrency. Achieving this goal would be impossible without reactive pioneers and their tremendous work. You can read the complete story in [Reactive Streams and Kotlin Flows](https://medium.com/@elizarov/reactive-streams-and-kotlin-flows-bfd12772cda4) article.
While being different, conceptually, Flow *is* a reactive stream and it is possible to convert it to the reactive (spec and TCK compliant) Publisher and vice versa.
-Such converters are provided by `kotlinx.coroutines` out-of-the-box and can be found in corresponding reactive modules (`kotlinx-coroutines-reactive` for Reactive Streams, `kotlinx-coroutines-reactor` for Project Reactor and `kotlinx-coroutines-rx2` for RxJava2).
+Such converters are provided by `kotlinx.coroutines` out-of-the-box and can be found in corresponding reactive modules (`kotlinx-coroutines-reactive` for Reactive Streams, `kotlinx-coroutines-reactor` for Project Reactor and `kotlinx-coroutines-rx2`/`kotlinx-coroutines-rx3` for RxJava2/RxJava3).
Integration modules include conversions from and to `Flow`, integration with Reactor's `Context` and suspension-friendly ways to work with various reactive entities.
<!-- stdlib references -->
diff --git a/gradle.properties b/gradle.properties
index 56a1a23..233ad36 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -18,6 +18,7 @@
reactor_vesion=3.2.5.RELEASE
reactive_streams_version=1.0.2
rxjava2_version=2.2.8
+rxjava3_version=3.0.2
javafx_version=11.0.2
javafx_plugin_version=0.0.8
binary_compatibility_validator_version=0.2.2
diff --git a/reactive/README.md b/reactive/README.md
index 8679a2b..35706ac 100644
--- a/reactive/README.md
+++ b/reactive/README.md
@@ -8,3 +8,4 @@
* [kotlinx-coroutines-reactive](kotlinx-coroutines-reactive/README.md) -- utilities for [Reactive Streams](https://www.reactive-streams.org)
* [kotlinx-coroutines-reactor](kotlinx-coroutines-reactor/README.md) -- utilities for [Reactor](https://projectreactor.io)
* [kotlinx-coroutines-rx2](kotlinx-coroutines-rx2/README.md) -- utilities for [RxJava 2.x](https://github.com/ReactiveX/RxJava)
+* [kotlinx-coroutines-rx3](kotlinx-coroutines-rx3/README.md) -- utilities for [RxJava 3.x](https://github.com/ReactiveX/RxJava)
diff --git a/reactive/kotlinx-coroutines-rx2/build.gradle b/reactive/kotlinx-coroutines-rx2/build.gradle
index b583025..6d2c4ab 100644
--- a/reactive/kotlinx-coroutines-rx2/build.gradle
+++ b/reactive/kotlinx-coroutines-rx2/build.gradle
@@ -11,7 +11,7 @@
tasks.withType(dokka.getClass()) {
externalDocumentationLink {
- url = new URL('http://reactivex.io/RxJava/javadoc/')
+ url = new URL('http://reactivex.io/RxJava/2.x/javadoc/')
packageListUrl = projectDir.toPath().resolve("package.list").toUri().toURL()
}
}
diff --git a/reactive/kotlinx-coroutines-rx3/README.md b/reactive/kotlinx-coroutines-rx3/README.md
new file mode 100644
index 0000000..3aa73eb
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/README.md
@@ -0,0 +1,84 @@
+# Module kotlinx-coroutines-rx3
+
+Utilities for [RxJava 3.x](https://github.com/ReactiveX/RxJava).
+
+Coroutine builders:
+
+| **Name** | **Result** | **Scope** | **Description**
+| --------------- | --------------------------------------- | ---------------- | ---------------
+| [rxCompletable] | `Completable` | [CoroutineScope] | Cold completable that starts coroutine on subscribe
+| [rxMaybe] | `Maybe` | [CoroutineScope] | Cold maybe that starts coroutine on subscribe
+| [rxSingle] | `Single` | [CoroutineScope] | Cold single that starts coroutine on subscribe
+| [rxObservable] | `Observable` | [ProducerScope] | Cold observable that starts coroutine on subscribe
+| [rxFlowable] | `Flowable` | [ProducerScope] | Cold observable that starts coroutine on subscribe with **backpressure** support
+
+Integration with [Flow]:
+
+| **Name** | **Result** | **Description**
+| --------------- | -------------- | ---------------
+| [Flow.asFlowable] | `Flowable` | Converts the given flow to a cold Flowable.
+| [Flow.asObservable] | `Observable` | Converts the given flow to a cold Observable.
+| [ObservableSource.asFlow] | `Flow` | Converts the given cold ObservableSource to flow
+
+Suspending extension functions and suspending iteration:
+
+| **Name** | **Description**
+| -------- | ---------------
+| [CompletableSource.await][io.reactivex.rxjava3.core.CompletableSource.await] | Awaits for completion of the completable value
+| [MaybeSource.await][io.reactivex.rxjava3.core.MaybeSource.await] | Awaits for the value of the maybe and returns it or null
+| [MaybeSource.awaitOrDefault][io.reactivex.rxjava3.core.MaybeSource.awaitOrDefault] | Awaits for the value of the maybe and returns it or default
+| [SingleSource.await][io.reactivex.rxjava3.core.SingleSource.await] | Awaits for completion of the single value and returns it
+| [ObservableSource.awaitFirst][io.reactivex.rxjava3.core.ObservableSource.awaitFirst] | Awaits for the first value from the given observable
+| [ObservableSource.awaitFirstOrDefault][io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrDefault] | Awaits for the first value from the given observable or default
+| [ObservableSource.awaitFirstOrElse][io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrElse] | Awaits for the first value from the given observable or default from a function
+| [ObservableSource.awaitFirstOrNull][io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrNull] | Awaits for the first value from the given observable or null
+| [ObservableSource.awaitLast][io.reactivex.rxjava3.core.ObservableSource.awaitFirst] | Awaits for the last value from the given observable
+| [ObservableSource.awaitSingle][io.reactivex.rxjava3.core.ObservableSource.awaitSingle] | Awaits for the single value from the given observable
+
+Note that `Flowable` is a subclass of [Reactive Streams](https://www.reactive-streams.org)
+`Publisher` and extensions for it are covered by
+[kotlinx-coroutines-reactive](../kotlinx-coroutines-reactive) module.
+
+Conversion functions:
+
+| **Name** | **Description**
+| -------- | ---------------
+| [Job.asCompletable][kotlinx.coroutines.Job.asCompletable] | Converts job to hot completable
+| [Deferred.asSingle][kotlinx.coroutines.Deferred.asSingle] | Converts deferred value to hot single
+| [Scheduler.asCoroutineDispatcher][io.reactivex.rxjava3.core.Scheduler.asCoroutineDispatcher] | Converts scheduler to [CoroutineDispatcher]
+
+<!--- MODULE kotlinx-coroutines-core -->
+<!--- INDEX kotlinx.coroutines -->
+[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html
+[CoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-dispatcher/index.html
+<!--- INDEX kotlinx.coroutines.channels -->
+[ProducerScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-producer-scope/index.html
+<!--- INDEX kotlinx.coroutines.flow -->
+[Flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html
+<!--- MODULE kotlinx-coroutines-rx3 -->
+<!--- INDEX kotlinx.coroutines.rx3 -->
+[rxCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/rx-completable.html
+[rxMaybe]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/rx-maybe.html
+[rxSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/rx-single.html
+[rxObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/rx-observable.html
+[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/rx-flowable.html
+[Flow.asFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/kotlinx.coroutines.flow.-flow/as-flowable.html
+[Flow.asObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/kotlinx.coroutines.flow.-flow/as-observable.html
+[ObservableSource.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/as-flow.html
+[io.reactivex.rxjava3.core.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-completable-source/await.html
+[io.reactivex.rxjava3.core.MaybeSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-maybe-source/await.html
+[io.reactivex.rxjava3.core.MaybeSource.awaitOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-maybe-source/await-or-default.html
+[io.reactivex.rxjava3.core.SingleSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-single-source/await.html
+[io.reactivex.rxjava3.core.ObservableSource.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/await-first.html
+[io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/await-first-or-default.html
+[io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrElse]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/await-first-or-else.html
+[io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/await-first-or-null.html
+[io.reactivex.rxjava3.core.ObservableSource.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/await-single.html
+[kotlinx.coroutines.Job.asCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/kotlinx.coroutines.-job/as-completable.html
+[kotlinx.coroutines.Deferred.asSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/kotlinx.coroutines.-deferred/as-single.html
+[io.reactivex.rxjava3.core.Scheduler.asCoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-scheduler/as-coroutine-dispatcher.html
+<!--- END -->
+
+# Package kotlinx.coroutines.rx3
+
+Utilities for [RxJava 3.x](https://github.com/ReactiveX/RxJava).
diff --git a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api
new file mode 100644
index 0000000..27c3d3d
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api
@@ -0,0 +1,70 @@
+public final class kotlinx/coroutines/rx3/RxAwaitKt {
+ public static final fun await (Lio/reactivex/rxjava3/core/CompletableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public static final fun await (Lio/reactivex/rxjava3/core/MaybeSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public static final fun await (Lio/reactivex/rxjava3/core/SingleSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public static final fun awaitFirst (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public static final fun awaitFirstOrDefault (Lio/reactivex/rxjava3/core/ObservableSource;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public static final fun awaitFirstOrElse (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public static final fun awaitFirstOrNull (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public static final fun awaitLast (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public static final fun awaitOrDefault (Lio/reactivex/rxjava3/core/MaybeSource;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public static final fun awaitSingle (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+}
+
+public final class kotlinx/coroutines/rx3/RxChannelKt {
+ public static final fun collect (Lio/reactivex/rxjava3/core/MaybeSource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public static final fun collect (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public static final fun openSubscription (Lio/reactivex/rxjava3/core/MaybeSource;)Lkotlinx/coroutines/channels/ReceiveChannel;
+ public static final fun openSubscription (Lio/reactivex/rxjava3/core/ObservableSource;)Lkotlinx/coroutines/channels/ReceiveChannel;
+}
+
+public final class kotlinx/coroutines/rx3/RxCompletableKt {
+ public static final fun rxCompletable (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/rxjava3/core/Completable;
+ public static synthetic fun rxCompletable$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Completable;
+}
+
+public final class kotlinx/coroutines/rx3/RxConvertKt {
+ public static final fun asCompletable (Lkotlinx/coroutines/Job;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Completable;
+ public static final fun asFlow (Lio/reactivex/rxjava3/core/ObservableSource;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun asMaybe (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Maybe;
+ public static final fun asSingle (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Single;
+ public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Flowable;
+ public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Observable;
+}
+
+public final class kotlinx/coroutines/rx3/RxFlowableKt {
+ public static final fun rxFlowable (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/rxjava3/core/Flowable;
+ public static synthetic fun rxFlowable$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Flowable;
+}
+
+public final class kotlinx/coroutines/rx3/RxMaybeKt {
+ public static final fun rxMaybe (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/rxjava3/core/Maybe;
+ public static synthetic fun rxMaybe$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Maybe;
+}
+
+public final class kotlinx/coroutines/rx3/RxObservableKt {
+ public static final fun rxObservable (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/rxjava3/core/Observable;
+ public static synthetic fun rxObservable$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Observable;
+}
+
+public final class kotlinx/coroutines/rx3/RxSchedulerKt {
+ public static final fun asCoroutineDispatcher (Lio/reactivex/rxjava3/core/Scheduler;)Lkotlinx/coroutines/rx3/SchedulerCoroutineDispatcher;
+}
+
+public final class kotlinx/coroutines/rx3/RxSingleKt {
+ public static final fun rxSingle (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/rxjava3/core/Single;
+ public static synthetic fun rxSingle$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Single;
+}
+
+public final class kotlinx/coroutines/rx3/SchedulerCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher, kotlinx/coroutines/Delay {
+ public fun <init> (Lio/reactivex/rxjava3/core/Scheduler;)V
+ public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V
+ public fun equals (Ljava/lang/Object;)Z
+ public final fun getScheduler ()Lio/reactivex/rxjava3/core/Scheduler;
+ public fun hashCode ()I
+ public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle;
+ public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V
+ public fun toString ()Ljava/lang/String;
+}
+
diff --git a/reactive/kotlinx-coroutines-rx3/build.gradle b/reactive/kotlinx-coroutines-rx3/build.gradle
new file mode 100644
index 0000000..ced694a
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/build.gradle
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+targetCompatibility = JavaVersion.VERSION_1_8
+
+dependencies {
+ compile project(':kotlinx-coroutines-reactive')
+ testCompile project(':kotlinx-coroutines-reactive').sourceSets.test.output
+ testCompile "org.reactivestreams:reactive-streams-tck:$reactive_streams_version"
+ compile "io.reactivex.rxjava3:rxjava:$rxjava3_version"
+}
+
+compileTestKotlin {
+ kotlinOptions.jvmTarget = "1.8"
+}
+
+compileKotlin {
+ kotlinOptions.jvmTarget = "1.8"
+}
+
+tasks.withType(dokka.getClass()) {
+ externalDocumentationLink {
+ url = new URL('http://reactivex.io/RxJava/3.x/javadoc/')
+ packageListUrl = projectDir.toPath().resolve("package.list").toUri().toURL()
+ }
+}
+
+task testNG(type: Test) {
+ useTestNG()
+ reports.html.destination = file("$buildDir/reports/testng")
+ include '**/*ReactiveStreamTckTest.*'
+ // Skip testNG when tests are filtered with --tests, otherwise it simply fails
+ onlyIf {
+ filter.includePatterns.isEmpty()
+ }
+ doFirst {
+ // Classic gradle, nothing works without doFirst
+ println "TestNG tests: ($includes)"
+ }
+}
+
+test {
+ dependsOn(testNG)
+ reports.html.destination = file("$buildDir/reports/junit")
+}
diff --git a/reactive/kotlinx-coroutines-rx3/package.list b/reactive/kotlinx-coroutines-rx3/package.list
new file mode 100644
index 0000000..889916d
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/package.list
@@ -0,0 +1,14 @@
+io.reactivex.rxjava3.core
+io.reactivex.rxjava3.annotations
+io.reactivex.rxjava3.disposables
+io.reactivex.rxjava3.exceptions
+io.reactivex.rxjava3.flowables
+io.reactivex.rxjava3.functions
+io.reactivex.rxjava3.observables
+io.reactivex.rxjava3.observers
+io.reactivex.rxjava3.parallel
+io.reactivex.rxjava3.plugins
+io.reactivex.rxjava3.processors
+io.reactivex.rxjava3.schedulers
+io.reactivex.rxjava3.subjects
+io.reactivex.rxjava3.subscribers
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt b/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt
new file mode 100644
index 0000000..e52556e
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt
@@ -0,0 +1,220 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import io.reactivex.rxjava3.disposables.Disposable
+import kotlinx.coroutines.CancellableContinuation
+import kotlinx.coroutines.CancellationException
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.suspendCancellableCoroutine
+import kotlin.coroutines.*
+
+// ------------------------ CompletableSource ------------------------
+
+/**
+ * Awaits for completion of this completable without blocking a thread.
+ * Returns `Unit` or throws the corresponding exception if this completable had produced error.
+ *
+ * This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this
+ * suspending function is suspended, this function immediately resumes with [CancellationException].
+ */
+public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
+ subscribe(object : CompletableObserver {
+ override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
+ override fun onComplete() { cont.resume(Unit) }
+ override fun onError(e: Throwable) { cont.resumeWithException(e) }
+ })
+}
+
+// ------------------------ MaybeSource ------------------------
+
+/**
+ * Awaits for completion of the maybe without blocking a thread.
+ * Returns the resulting value, null if no value was produced or throws the corresponding exception if this
+ * maybe had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ */
+@Suppress("UNCHECKED_CAST")
+public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).awaitOrDefault(null)
+
+/**
+ * Awaits for completion of the maybe without blocking a thread.
+ * Returns the resulting value, [default] if no value was produced or throws the corresponding exception if this
+ * maybe had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ */
+public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = suspendCancellableCoroutine { cont ->
+ subscribe(object : MaybeObserver<T> {
+ override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
+ override fun onComplete() { cont.resume(default) }
+ override fun onSuccess(t: T) { cont.resume(t) }
+ override fun onError(error: Throwable) { cont.resumeWithException(error) }
+ })
+}
+
+// ------------------------ SingleSource ------------------------
+
+/**
+ * Awaits for completion of the single value without blocking a thread.
+ * Returns the resulting value or throws the corresponding exception if this single had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ */
+public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
+ subscribe(object : SingleObserver<T> {
+ override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
+ override fun onSuccess(t: T) { cont.resume(t) }
+ override fun onError(error: Throwable) { cont.resumeWithException(error) }
+ })
+}
+
+// ------------------------ ObservableSource ------------------------
+
+/**
+ * Awaits for the first value from the given observable without blocking a thread.
+ * Returns the resulting value or throws the corresponding exception if this observable had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ *
+ * @throws NoSuchElementException if observable does not emit any value
+ */
+public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
+
+/**
+ * Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
+ * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ */
+public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
+
+/**
+ * Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
+ * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ */
+public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
+
+/**
+ * Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
+ * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ */
+public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
+
+/**
+ * Awaits for the last value from the given observable without blocking a thread.
+ * Returns the resulting value or throws the corresponding exception if this observable had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ *
+ * @throws NoSuchElementException if observable does not emit any value
+ */
+public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST)
+
+/**
+ * Awaits for the single value from the given observable without blocking a thread.
+ * Returns the resulting value or throws the corresponding exception if this observable had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ *
+ * @throws NoSuchElementException if observable does not emit any value
+ * @throws IllegalArgumentException if observable emits more than one value
+ */
+public suspend fun <T> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
+
+// ------------------------ private ------------------------
+
+internal fun CancellableContinuation<*>.disposeOnCancellation(d: Disposable) =
+ invokeOnCancellation { d.dispose() }
+
+private enum class Mode(val s: String) {
+ FIRST("awaitFirst"),
+ FIRST_OR_DEFAULT("awaitFirstOrDefault"),
+ LAST("awaitLast"),
+ SINGLE("awaitSingle");
+ override fun toString(): String = s
+}
+
+private suspend fun <T> ObservableSource<T>.awaitOne(
+ mode: Mode,
+ default: T? = null
+): T = suspendCancellableCoroutine { cont ->
+ subscribe(object : Observer<T> {
+ private lateinit var subscription: Disposable
+ private var value: T? = null
+ private var seenValue = false
+
+ override fun onSubscribe(sub: Disposable) {
+ subscription = sub
+ cont.invokeOnCancellation { sub.dispose() }
+ }
+
+ override fun onNext(t: T) {
+ when (mode) {
+ Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
+ if (!seenValue) {
+ seenValue = true
+ cont.resume(t)
+ subscription.dispose()
+ }
+ }
+ Mode.LAST, Mode.SINGLE -> {
+ if (mode == Mode.SINGLE && seenValue) {
+ if (cont.isActive)
+ cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
+ subscription.dispose()
+ } else {
+ value = t
+ seenValue = true
+ }
+ }
+ }
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ override fun onComplete() {
+ if (seenValue) {
+ if (cont.isActive) cont.resume(value as T)
+ return
+ }
+ when {
+ mode == Mode.FIRST_OR_DEFAULT -> {
+ cont.resume(default as T)
+ }
+ cont.isActive -> {
+ cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
+ }
+ }
+ }
+
+ override fun onError(e: Throwable) {
+ cont.resumeWithException(e)
+ }
+ })
+}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt b/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt
new file mode 100644
index 0000000..0b57b8b
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.functions.*
+import io.reactivex.rxjava3.plugins.*
+import kotlinx.coroutines.*
+import kotlin.coroutines.*
+
+internal class RxCancellable(private val job: Job) : Cancellable {
+ override fun cancel() {
+ job.cancel()
+ }
+}
+
+internal fun handleUndeliverableException(cause: Throwable, context: CoroutineContext) {
+ if (cause is CancellationException) return // Async CE should be completely ignored
+ try {
+ RxJavaPlugins.onError(cause)
+ } catch (e: Throwable) {
+ handleCoroutineException(context, cause)
+ }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt
new file mode 100644
index 0000000..acb907b
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import io.reactivex.rxjava3.disposables.*
+import kotlinx.atomicfu.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.internal.*
+
+/**
+ * Subscribes to this [MaybeSource] and returns a channel to receive elements emitted by it.
+ * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this source.
+ *
+ * This API is internal in the favour of [Flow].
+ * [MaybeSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first.
+ */
+@PublishedApi
+internal fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
+ val channel = SubscriptionChannel<T>()
+ subscribe(channel)
+ return channel
+}
+
+/**
+ * Subscribes to this [ObservableSource] and returns a channel to receive elements emitted by it.
+ * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this source.
+ *
+ * This API is internal in the favour of [Flow].
+ * [ObservableSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first.
+ */
+@PublishedApi
+internal fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
+ val channel = SubscriptionChannel<T>()
+ subscribe(channel)
+ return channel
+}
+
+/**
+ * Subscribes to this [MaybeSource] and performs the specified action for each received element.
+ * Cancels subscription if any exception happens during collect.
+ */
+public suspend inline fun <T> MaybeSource<T>.collect(action: (T) -> Unit): Unit =
+ openSubscription().consumeEach(action)
+
+/**
+ * Subscribes to this [ObservableSource] and performs the specified action for each received element.
+ * Cancels subscription if any exception happens during collect.
+ */
+public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit): Unit =
+ openSubscription().consumeEach(action)
+
+@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
+private class SubscriptionChannel<T> :
+ LinkedListChannel<T>(), Observer<T>, MaybeObserver<T>
+{
+ private val _subscription = atomic<Disposable?>(null)
+
+ @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
+ override fun onClosedIdempotent(closed: LockFreeLinkedListNode) {
+ _subscription.getAndSet(null)?.dispose() // dispose exactly once
+ }
+
+ // Observer overrider
+ override fun onSubscribe(sub: Disposable) {
+ _subscription.value = sub
+ }
+
+ override fun onSuccess(t: T) {
+ offer(t)
+ }
+
+ override fun onNext(t: T) {
+ offer(t)
+ }
+
+ override fun onComplete() {
+ close(cause = null)
+ }
+
+ override fun onError(e: Throwable) {
+ close(cause = e)
+ }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt b/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt
new file mode 100644
index 0000000..54b412f
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import kotlinx.coroutines.*
+import kotlin.coroutines.*
+
+/**
+ * Creates cold [Completable] that runs a given [block] in a coroutine and emits its result.
+ * Every time the returned completable is subscribed, it starts a new coroutine.
+ * Unsubscribing cancels running coroutine.
+ * Coroutine context can be specified with [context] argument.
+ * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
+ * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
+ */
+public fun rxCompletable(
+ context: CoroutineContext = EmptyCoroutineContext,
+ block: suspend CoroutineScope.() -> Unit
+): Completable {
+ require(context[Job] === null) { "Completable context cannot contain job in it." +
+ "Its lifecycle should be managed via Disposable handle. Had $context" }
+ return rxCompletableInternal(GlobalScope, context, block)
+}
+
+private fun rxCompletableInternal(
+ scope: CoroutineScope, // support for legacy rxCompletable in scope
+ context: CoroutineContext,
+ block: suspend CoroutineScope.() -> Unit
+): Completable = Completable.create { subscriber ->
+ val newContext = scope.newCoroutineContext(context)
+ val coroutine = RxCompletableCoroutine(newContext, subscriber)
+ subscriber.setCancellable(RxCancellable(coroutine))
+ coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
+}
+
+private class RxCompletableCoroutine(
+ parentContext: CoroutineContext,
+ private val subscriber: CompletableEmitter
+) : AbstractCoroutine<Unit>(parentContext, true) {
+ override fun onCompleted(value: Unit) {
+ try {
+ subscriber.onComplete()
+ } catch (e: Throwable) {
+ handleUndeliverableException(e, context)
+ }
+ }
+
+ override fun onCancelled(cause: Throwable, handled: Boolean) {
+ try {
+ if (!subscriber.tryOnError(cause)) {
+ handleUndeliverableException(cause, context)
+ }
+ } catch (e: Throwable) {
+ handleUndeliverableException(e, context)
+ }
+ }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
new file mode 100644
index 0000000..f9e2e21
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import io.reactivex.rxjava3.disposables.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.reactive.*
+import java.util.concurrent.atomic.*
+import kotlin.coroutines.*
+
+/**
+ * Converts this job to the hot reactive completable that signals
+ * with [onCompleted][CompletableSubscriber.onCompleted] when the corresponding job completes.
+ *
+ * Every subscriber gets the signal at the same time.
+ * Unsubscribing from the resulting completable **does not** affect the original job in any way.
+ *
+ * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
+ * in the future to account for the concept of structured concurrency.
+ *
+ * @param context -- the coroutine context from which the resulting completable is going to be signalled
+ */
+@ExperimentalCoroutinesApi
+public fun Job.asCompletable(context: CoroutineContext): Completable = rxCompletable(context) {
+ this@asCompletable.join()
+}
+
+/**
+ * Converts this deferred value to the hot reactive maybe that signals
+ * [onComplete][MaybeEmitter.onComplete], [onSuccess][MaybeEmitter.onSuccess] or [onError][MaybeEmitter.onError].
+ *
+ * Every subscriber gets the same completion value.
+ * Unsubscribing from the resulting maybe **does not** affect the original deferred value in any way.
+ *
+ * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
+ * in the future to account for the concept of structured concurrency.
+ *
+ * @param context -- the coroutine context from which the resulting maybe is going to be signalled
+ */
+@ExperimentalCoroutinesApi
+public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMaybe(context) {
+ this@asMaybe.await()
+}
+
+/**
+ * Converts this deferred value to the hot reactive single that signals either
+ * [onSuccess][SingleSubscriber.onSuccess] or [onError][SingleSubscriber.onError].
+ *
+ * Every subscriber gets the same completion value.
+ * Unsubscribing from the resulting single **does not** affect the original deferred value in any way.
+ *
+ * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
+ * in the future to account for the concept of structured concurrency.
+ *
+ * @param context -- the coroutine context from which the resulting single is going to be signalled
+ */
+@ExperimentalCoroutinesApi
+public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = rxSingle(context) {
+ this@asSingle.await()
+}
+
+/**
+ * Transforms given cold [ObservableSource] into cold [Flow].
+ *
+ * The resulting flow is _cold_, which means that [ObservableSource.subscribe] is called every time a terminal operator
+ * is applied to the resulting flow.
+ *
+ * A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the
+ * resulting flow to specify a user-defined value and to control what happens when data is produced faster
+ * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details.
+ */
+@ExperimentalCoroutinesApi
+public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
+ val disposableRef = AtomicReference<Disposable>()
+ val observer = object : Observer<T> {
+ override fun onComplete() { close() }
+ override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
+ override fun onNext(t: T) { sendBlocking(t) }
+ override fun onError(e: Throwable) { close(e) }
+ }
+
+ subscribe(observer)
+ awaitClose { disposableRef.getAndSet(Disposable.disposed())?.dispose() }
+}
+
+/**
+ * Converts the given flow to a cold observable.
+ * The original flow is cancelled when the observable subscriber is disposed.
+ */
+@JvmName("from")
+@ExperimentalCoroutinesApi
+public fun <T: Any> Flow<T>.asObservable() : Observable<T> = Observable.create { emitter ->
+ /*
+ * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
+ * asObservable is already invoked from unconfined
+ */
+ val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.ATOMIC) {
+ try {
+ collect { value -> emitter.onNext(value) }
+ emitter.onComplete()
+ } catch (e: Throwable) {
+ // 'create' provides safe emitter, so we can unconditionally call on* here if exception occurs in `onComplete`
+ if (e !is CancellationException) {
+ if (!emitter.tryOnError(e)) {
+ handleUndeliverableException(e, coroutineContext)
+ }
+ } else {
+ emitter.onComplete()
+ }
+ }
+ }
+ emitter.setCancellable(RxCancellable(job))
+}
+
+/**
+ * Converts the given flow to a cold flowable.
+ * The original flow is cancelled when the flowable subscriber is disposed.
+ */
+@JvmName("from")
+@ExperimentalCoroutinesApi
+public fun <T: Any> Flow<T>.asFlowable(): Flowable<T> = Flowable.fromPublisher(asPublisher())
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxFlowable.kt b/reactive/kotlinx-coroutines-rx3/src/RxFlowable.kt
new file mode 100644
index 0000000..2de46a6
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/src/RxFlowable.kt
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.reactive.*
+import kotlin.coroutines.*
+
+/**
+ * Creates cold [flowable][Flowable] that will run a given [block] in a coroutine.
+ * Every time the returned flowable is subscribed, it starts a new coroutine.
+ *
+ * Coroutine emits ([ObservableEmitter.onNext]) values with `send`, completes ([ObservableEmitter.onComplete])
+ * when the coroutine completes or channel is explicitly closed and emits error ([ObservableEmitter.onError])
+ * if coroutine throws an exception or closes channel with a cause.
+ * Unsubscribing cancels running coroutine.
+ *
+ * Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
+ * `onNext` is not invoked concurrently.
+ *
+ * Coroutine context can be specified with [context] argument.
+ * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
+ * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
+ *
+ * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
+ */
+@ExperimentalCoroutinesApi
+public fun <T: Any> rxFlowable(
+ context: CoroutineContext = EmptyCoroutineContext,
+ @BuilderInference block: suspend ProducerScope<T>.() -> Unit
+): Flowable<T> {
+ require(context[Job] === null) { "Flowable context cannot contain job in it." +
+ "Its lifecycle should be managed via Disposable handle. Had $context" }
+ return Flowable.fromPublisher(publishInternal(GlobalScope, context, RX_HANDLER, block))
+}
+
+private val RX_HANDLER: (Throwable, CoroutineContext) -> Unit = ::handleUndeliverableException
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt
new file mode 100644
index 0000000..4d55ef5
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import kotlinx.coroutines.*
+import kotlin.coroutines.*
+
+/**
+ * Creates cold [maybe][Maybe] that will run a given [block] in a coroutine and emits its result.
+ * If [block] result is `null`, [onComplete][MaybeObserver.onComplete] is invoked without a value.
+ * Every time the returned observable is subscribed, it starts a new coroutine.
+ * Unsubscribing cancels running coroutine.
+ * Coroutine context can be specified with [context] argument.
+ * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
+ * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
+ */
+public fun <T> rxMaybe(
+ context: CoroutineContext = EmptyCoroutineContext,
+ block: suspend CoroutineScope.() -> T?
+): Maybe<T> {
+ require(context[Job] === null) { "Maybe context cannot contain job in it." +
+ "Its lifecycle should be managed via Disposable handle. Had $context" }
+ return rxMaybeInternal(GlobalScope, context, block)
+}
+
+private fun <T> rxMaybeInternal(
+ scope: CoroutineScope, // support for legacy rxMaybe in scope
+ context: CoroutineContext,
+ block: suspend CoroutineScope.() -> T?
+): Maybe<T> = Maybe.create { subscriber ->
+ val newContext = scope.newCoroutineContext(context)
+ val coroutine = RxMaybeCoroutine(newContext, subscriber)
+ subscriber.setCancellable(RxCancellable(coroutine))
+ coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
+}
+
+private class RxMaybeCoroutine<T>(
+ parentContext: CoroutineContext,
+ private val subscriber: MaybeEmitter<T>
+) : AbstractCoroutine<T>(parentContext, true) {
+ override fun onCompleted(value: T) {
+ try {
+ if (value == null) subscriber.onComplete() else subscriber.onSuccess(value)
+ } catch (e: Throwable) {
+ handleUndeliverableException(e, context)
+ }
+ }
+
+ override fun onCancelled(cause: Throwable, handled: Boolean) {
+ try {
+ if (!subscriber.tryOnError(cause)) {
+ handleUndeliverableException(cause, context)
+ }
+ } catch (e: Throwable) {
+ handleUndeliverableException(e, context)
+ }
+ }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt
new file mode 100644
index 0000000..102d06e
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt
@@ -0,0 +1,195 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import io.reactivex.rxjava3.exceptions.*
+import kotlinx.atomicfu.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.selects.*
+import kotlinx.coroutines.sync.*
+import kotlin.coroutines.*
+
+/**
+ * Creates cold [observable][Observable] that will run a given [block] in a coroutine.
+ * Every time the returned observable is subscribed, it starts a new coroutine.
+ *
+ * Coroutine emits ([ObservableEmitter.onNext]) values with `send`, completes ([ObservableEmitter.onComplete])
+ * when the coroutine completes or channel is explicitly closed and emits error ([ObservableEmitter.onError])
+ * if coroutine throws an exception or closes channel with a cause.
+ * Unsubscribing cancels running coroutine.
+ *
+ * Invocations of `send` are suspended appropriately to ensure that `onNext` is not invoked concurrently.
+ * Note that Rx 2.x [Observable] **does not support backpressure**.
+ *
+ * Coroutine context can be specified with [context] argument.
+ * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
+ * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
+ */
+@ExperimentalCoroutinesApi
+public fun <T : Any> rxObservable(
+ context: CoroutineContext = EmptyCoroutineContext,
+ @BuilderInference block: suspend ProducerScope<T>.() -> Unit
+): Observable<T> {
+ require(context[Job] === null) { "Observable context cannot contain job in it." +
+ "Its lifecycle should be managed via Disposable handle. Had $context" }
+ return rxObservableInternal(GlobalScope, context, block)
+}
+
+private fun <T : Any> rxObservableInternal(
+ scope: CoroutineScope, // support for legacy rxObservable in scope
+ context: CoroutineContext,
+ block: suspend ProducerScope<T>.() -> Unit
+): Observable<T> = Observable.create { subscriber ->
+ val newContext = scope.newCoroutineContext(context)
+ val coroutine = RxObservableCoroutine(newContext, subscriber)
+ subscriber.setCancellable(RxCancellable(coroutine)) // do it first (before starting coroutine), to await unnecessary suspensions
+ coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
+}
+
+private const val OPEN = 0 // open channel, still working
+private const val CLOSED = -1 // closed, but have not signalled onCompleted/onError yet
+private const val SIGNALLED = -2 // already signalled subscriber onCompleted/onError
+
+private class RxObservableCoroutine<T: Any>(
+ parentContext: CoroutineContext,
+ private val subscriber: ObservableEmitter<T>
+) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, SelectClause2<T, SendChannel<T>> {
+ override val channel: SendChannel<T> get() = this
+
+ // Mutex is locked when while subscriber.onXXX is being invoked
+ private val mutex = Mutex()
+
+ private val _signal = atomic(OPEN)
+
+ override val isClosedForSend: Boolean get() = isCompleted
+ override val isFull: Boolean = mutex.isLocked
+ override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause)
+ override fun invokeOnClose(handler: (Throwable?) -> Unit) =
+ throw UnsupportedOperationException("RxObservableCoroutine doesn't support invokeOnClose")
+
+ override fun offer(element: T): Boolean {
+ if (!mutex.tryLock()) return false
+ doLockedNext(element)
+ return true
+ }
+
+ public override suspend fun send(element: T) {
+ // fast-path -- try send without suspension
+ if (offer(element)) return
+ // slow-path does suspend
+ return sendSuspend(element)
+ }
+
+ private suspend fun sendSuspend(element: T) {
+ mutex.lock()
+ doLockedNext(element)
+ }
+
+ override val onSend: SelectClause2<T, SendChannel<T>>
+ get() = this
+
+ // registerSelectSend
+ @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
+ override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) {
+ mutex.onLock.registerSelectClause2(select, null) {
+ doLockedNext(element)
+ block(this)
+ }
+ }
+
+ // assert: mutex.isLocked()
+ private fun doLockedNext(elem: T) {
+ // check if already closed for send
+ if (!isActive) {
+ doLockedSignalCompleted(completionCause, completionCauseHandled)
+ throw getCancellationException()
+ }
+ // notify subscriber
+ try {
+ subscriber.onNext(elem)
+ } catch (e: Throwable) {
+ // If onNext fails with exception, then we cancel coroutine (with this exception) and then rethrow it
+ // to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery,
+ // this failure is essentially equivalent to a failure of a child coroutine.
+ cancelCoroutine(e)
+ mutex.unlock()
+ throw e
+ }
+ /*
+ * There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might
+ * happen after this check and before `unlock` (see signalCompleted that does not do anything
+ * if it fails to acquire the lock that we are still holding).
+ * We have to recheck `isCompleted` after `unlock` anyway.
+ */
+ unlockAndCheckCompleted()
+ }
+
+ private fun unlockAndCheckCompleted() {
+ mutex.unlock()
+ // recheck isActive
+ if (!isActive && mutex.tryLock())
+ doLockedSignalCompleted(completionCause, completionCauseHandled)
+ }
+
+ // assert: mutex.isLocked()
+ private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) {
+ // cancellation failures
+ try {
+ if (_signal.value >= CLOSED) {
+ _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
+ try {
+ if (cause != null && cause !is CancellationException) {
+ /*
+ * Reactive frameworks have two types of exceptions: regular and fatal.
+ * Regular are passed to onError.
+ * Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297).
+ * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether
+ * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was
+ * thrown by subscriber or upstream).
+ * To make behaviour consistent and least surprising, we always handle fatal exceptions
+ * by coroutines machinery, anyway, they should not be present in regular program flow,
+ * thus our goal here is just to expose it as soon as possible.
+ */
+ subscriber.tryOnError(cause)
+ if (!handled && cause.isFatal()) {
+ handleUndeliverableException(cause, context)
+ }
+ }
+ else {
+ subscriber.onComplete()
+ }
+ } catch (e: Throwable) {
+ // Unhandled exception (cannot handle in other way, since we are already complete)
+ handleUndeliverableException(e, context)
+ }
+ }
+ } finally {
+ mutex.unlock()
+ }
+ }
+
+ private fun signalCompleted(cause: Throwable?, handled: Boolean) {
+ if (!_signal.compareAndSet(OPEN, CLOSED)) return // abort, other thread invoked doLockedSignalCompleted
+ if (mutex.tryLock()) // if we can acquire the lock
+ doLockedSignalCompleted(cause, handled)
+ }
+
+ override fun onCompleted(value: Unit) {
+ signalCompleted(null, false)
+ }
+
+ override fun onCancelled(cause: Throwable, handled: Boolean) {
+ signalCompleted(cause, handled)
+ }
+}
+
+internal fun Throwable.isFatal() = try {
+ Exceptions.throwIfFatal(this) // Rx-consistent behaviour without hardcode
+ false
+} catch (e: Throwable) {
+ true
+}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt b/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt
new file mode 100644
index 0000000..6e91aee
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.Scheduler
+import kotlinx.coroutines.*
+import java.util.concurrent.TimeUnit
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher]
+ * and provides native support of [delay] and [withTimeout].
+ */
+public fun Scheduler.asCoroutineDispatcher(): SchedulerCoroutineDispatcher = SchedulerCoroutineDispatcher(this)
+
+/**
+ * Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler].
+ */
+public class SchedulerCoroutineDispatcher(
+ /**
+ * Underlying scheduler of current [CoroutineDispatcher].
+ */
+ public val scheduler: Scheduler
+) : CoroutineDispatcher(), Delay {
+ /** @suppress */
+ override fun dispatch(context: CoroutineContext, block: Runnable) {
+ scheduler.scheduleDirect(block)
+ }
+
+ /** @suppress */
+ override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
+ val disposable = scheduler.scheduleDirect({
+ with(continuation) { resumeUndispatched(Unit) }
+ }, timeMillis, TimeUnit.MILLISECONDS)
+ continuation.disposeOnCancellation(disposable)
+ }
+
+ /** @suppress */
+ override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
+ val disposable = scheduler.scheduleDirect(block, timeMillis, TimeUnit.MILLISECONDS)
+ return DisposableHandle { disposable.dispose() }
+ }
+
+ /** @suppress */
+ override fun toString(): String = scheduler.toString()
+ /** @suppress */
+ override fun equals(other: Any?): Boolean = other is SchedulerCoroutineDispatcher && other.scheduler === scheduler
+ /** @suppress */
+ override fun hashCode(): Int = System.identityHashCode(scheduler)
+}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt b/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt
new file mode 100644
index 0000000..225df93
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import kotlinx.coroutines.*
+import kotlin.coroutines.*
+
+/**
+ * Creates cold [single][Single] that will run a given [block] in a coroutine and emits its result.
+ * Every time the returned observable is subscribed, it starts a new coroutine.
+ * Unsubscribing cancels running coroutine.
+ * Coroutine context can be specified with [context] argument.
+ * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
+ * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
+ */
+public fun <T : Any> rxSingle(
+ context: CoroutineContext = EmptyCoroutineContext,
+ block: suspend CoroutineScope.() -> T
+): Single<T> {
+ require(context[Job] === null) { "Single context cannot contain job in it." +
+ "Its lifecycle should be managed via Disposable handle. Had $context" }
+ return rxSingleInternal(GlobalScope, context, block)
+}
+
+private fun <T : Any> rxSingleInternal(
+ scope: CoroutineScope, // support for legacy rxSingle in scope
+ context: CoroutineContext,
+ block: suspend CoroutineScope.() -> T
+): Single<T> = Single.create { subscriber ->
+ val newContext = scope.newCoroutineContext(context)
+ val coroutine = RxSingleCoroutine(newContext, subscriber)
+ subscriber.setCancellable(RxCancellable(coroutine))
+ coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
+}
+
+private class RxSingleCoroutine<T: Any>(
+ parentContext: CoroutineContext,
+ private val subscriber: SingleEmitter<T>
+) : AbstractCoroutine<T>(parentContext, true) {
+ override fun onCompleted(value: T) {
+ try {
+ subscriber.onSuccess(value)
+ } catch (e: Throwable) {
+ handleUndeliverableException(e, context)
+ }
+ }
+
+ override fun onCancelled(cause: Throwable, handled: Boolean) {
+ try {
+ if (!subscriber.tryOnError(cause)) {
+ handleUndeliverableException(cause, context)
+ }
+ } catch (e: Throwable) {
+ handleUndeliverableException(e, context)
+ }
+ }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/BackpressureTest.kt b/reactive/kotlinx-coroutines-rx3/test/BackpressureTest.kt
new file mode 100644
index 0000000..9cec483
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/BackpressureTest.kt
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.reactive.*
+import org.junit.Test
+import kotlin.test.*
+
+class BackpressureTest : TestBase() {
+ @Test
+ fun testBackpressureDropDirect() = runTest {
+ expect(1)
+ Flowable.fromArray(1)
+ .onBackpressureDrop()
+ .collect {
+ assertEquals(1, it)
+ expect(2)
+ }
+ finish(3)
+ }
+
+ @Test
+ fun testBackpressureDropFlow() = runTest {
+ expect(1)
+ Flowable.fromArray(1)
+ .onBackpressureDrop()
+ .asFlow()
+ .collect {
+ assertEquals(1, it)
+ expect(2)
+ }
+ finish(3)
+ }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/Check.kt b/reactive/kotlinx-coroutines-rx3/test/Check.kt
new file mode 100644
index 0000000..3d4704f
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/Check.kt
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import io.reactivex.rxjava3.plugins.*
+
+fun <T> checkSingleValue(
+ observable: Observable<T>,
+ checker: (T) -> Unit
+) {
+ val singleValue = observable.blockingSingle()
+ checker(singleValue)
+}
+
+fun checkErroneous(
+ observable: Observable<*>,
+ checker: (Throwable) -> Unit
+) {
+ val singleNotification = observable.materialize().blockingSingle()
+ val error = singleNotification.error ?: error("Excepted error")
+ checker(error)
+}
+
+fun <T> checkSingleValue(
+ single: Single<T>,
+ checker: (T) -> Unit
+) {
+ val singleValue = single.blockingGet()
+ checker(singleValue)
+}
+
+fun checkErroneous(
+ single: Single<*>,
+ checker: (Throwable) -> Unit
+) {
+ try {
+ single.blockingGet()
+ error("Should have failed")
+ } catch (e: Throwable) {
+ checker(e)
+ }
+}
+
+fun <T> checkMaybeValue(
+ maybe: Maybe<T>,
+ checker: (T?) -> Unit
+) {
+ val maybeValue = maybe.toFlowable().blockingIterable().firstOrNull()
+ checker(maybeValue)
+}
+
+@Suppress("UNCHECKED_CAST")
+fun checkErroneous(
+ maybe: Maybe<*>,
+ checker: (Throwable) -> Unit
+) {
+ try {
+ (maybe as Maybe<Any>).blockingGet()
+ error("Should have failed")
+ } catch (e: Throwable) {
+ checker(e)
+ }
+}
+
+inline fun withExceptionHandler(noinline handler: (Throwable) -> Unit, block: () -> Unit) {
+ val original = RxJavaPlugins.getErrorHandler()
+ RxJavaPlugins.setErrorHandler { handler(it) }
+ try {
+ block()
+ } finally {
+ RxJavaPlugins.setErrorHandler(original)
+ }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/CompletableTest.kt b/reactive/kotlinx-coroutines-rx3/test/CompletableTest.kt
new file mode 100644
index 0000000..e5399d1
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/CompletableTest.kt
@@ -0,0 +1,181 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import io.reactivex.rxjava3.disposables.*
+import io.reactivex.rxjava3.exceptions.*
+import kotlinx.coroutines.*
+import org.junit.Test
+import kotlin.test.*
+
+class CompletableTest : TestBase() {
+ @Test
+ fun testBasicSuccess() = runBlocking {
+ expect(1)
+ val completable = rxCompletable(currentDispatcher()) {
+ expect(4)
+ }
+ expect(2)
+ completable.subscribe {
+ expect(5)
+ }
+ expect(3)
+ yield() // to completable coroutine
+ finish(6)
+ }
+
+ @Test
+ fun testBasicFailure() = runBlocking {
+ expect(1)
+ val completable = rxCompletable(currentDispatcher()) {
+ expect(4)
+ throw RuntimeException("OK")
+ }
+ expect(2)
+ completable.subscribe({
+ expectUnreached()
+ }, { error ->
+ expect(5)
+ assertTrue(error is RuntimeException)
+ assertEquals("OK", error.message)
+ })
+ expect(3)
+ yield() // to completable coroutine
+ finish(6)
+ }
+
+ @Test
+ fun testBasicUnsubscribe() = runBlocking {
+ expect(1)
+ val completable = rxCompletable(currentDispatcher()) {
+ expect(4)
+ yield() // back to main, will get cancelled
+ expectUnreached()
+ }
+ expect(2)
+ // nothing is called on a disposed rx3 completable
+ val sub = completable.subscribe({
+ expectUnreached()
+ }, {
+ expectUnreached()
+ })
+ expect(3)
+ yield() // to started coroutine
+ expect(5)
+ sub.dispose() // will cancel coroutine
+ yield()
+ finish(6)
+ }
+
+ @Test
+ fun testAwaitSuccess() = runBlocking {
+ expect(1)
+ val completable = rxCompletable(currentDispatcher()) {
+ expect(3)
+ }
+ expect(2)
+ completable.await() // shall launch coroutine
+ finish(4)
+ }
+
+ @Test
+ fun testAwaitFailure() = runBlocking {
+ expect(1)
+ val completable = rxCompletable(currentDispatcher()) {
+ expect(3)
+ throw RuntimeException("OK")
+ }
+ expect(2)
+ try {
+ completable.await() // shall launch coroutine and throw exception
+ expectUnreached()
+ } catch (e: RuntimeException) {
+ finish(4)
+ assertEquals("OK", e.message)
+ }
+ }
+
+ @Test
+ fun testSuppressedException() = runTest {
+ val completable = rxCompletable(currentDispatcher()) {
+ launch(start = CoroutineStart.ATOMIC) {
+ throw TestException() // child coroutine fails
+ }
+ try {
+ delay(Long.MAX_VALUE)
+ } finally {
+ throw TestException2() // but parent throws another exception while cleaning up
+ }
+ }
+ try {
+ completable.await()
+ expectUnreached()
+ } catch (e: TestException) {
+ assertTrue(e.suppressed[0] is TestException2)
+ }
+ }
+
+ @Test
+ fun testUnhandledException() = runTest() {
+ expect(1)
+ var disposable: Disposable? = null
+ val handler = { e: Throwable ->
+ assertTrue(e is UndeliverableException && e.cause is TestException)
+ expect(5)
+ }
+ val completable = rxCompletable(currentDispatcher()) {
+ expect(4)
+ disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled
+ try {
+ delay(Long.MAX_VALUE)
+ } finally {
+ throw TestException() // would not be able to handle it since mono is disposed
+ }
+ }
+ withExceptionHandler(handler) {
+ completable.subscribe(object : CompletableObserver {
+ override fun onSubscribe(d: Disposable) {
+ expect(2)
+ disposable = d
+ }
+
+ override fun onComplete() {
+ expectUnreached()
+ }
+
+ override fun onError(t: Throwable) {
+ expectUnreached()
+ }
+ })
+ expect(3)
+ yield() // run coroutine
+ finish(6)
+ }
+ }
+
+ @Test
+ fun testFatalExceptionInSubscribe() = runTest {
+ val handler: (Throwable) -> Unit = { e ->
+ assertTrue(e is UndeliverableException && e.cause is LinkageError); expect(2)
+ }
+
+ withExceptionHandler(handler) {
+ rxCompletable(Dispatchers.Unconfined) {
+ expect(1)
+ 42
+ }.subscribe({ throw LinkageError() })
+ finish(3)
+ }
+ }
+
+ @Test
+ fun testFatalExceptionInSingle() = runTest {
+ rxCompletable(Dispatchers.Unconfined) {
+ throw LinkageError()
+ }.subscribe({ expectUnreached() }, { expect(1); assertTrue(it is LinkageError) })
+ finish(2)
+ }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/ConvertTest.kt b/reactive/kotlinx-coroutines-rx3/test/ConvertTest.kt
new file mode 100644
index 0000000..e2fe533
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/ConvertTest.kt
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.consumeAsFlow
+import org.junit.Assert
+import org.junit.Test
+import kotlin.test.*
+
+class ConvertTest : TestBase() {
+ @Test
+ fun testToCompletableSuccess() = runBlocking {
+ expect(1)
+ val job = launch {
+ expect(3)
+ }
+ val completable = job.asCompletable(coroutineContext.minusKey(Job))
+ completable.subscribe {
+ expect(4)
+ }
+ expect(2)
+ yield()
+ finish(5)
+ }
+
+ @Test
+ fun testToCompletableFail() = runBlocking {
+ expect(1)
+ val job = async(NonCancellable) { // don't kill parent on exception
+ expect(3)
+ throw RuntimeException("OK")
+ }
+ val completable = job.asCompletable(coroutineContext.minusKey(Job))
+ completable.subscribe {
+ expect(4)
+ }
+ expect(2)
+ yield()
+ finish(5)
+ }
+
+ @Test
+ fun testToMaybe() {
+ val d = GlobalScope.async {
+ delay(50)
+ "OK"
+ }
+ val maybe1 = d.asMaybe(Dispatchers.Unconfined)
+ checkMaybeValue(maybe1) {
+ assertEquals("OK", it)
+ }
+ val maybe2 = d.asMaybe(Dispatchers.Unconfined)
+ checkMaybeValue(maybe2) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testToMaybeEmpty() {
+ val d = GlobalScope.async {
+ delay(50)
+ null
+ }
+ val maybe1 = d.asMaybe(Dispatchers.Unconfined)
+ checkMaybeValue(maybe1, Assert::assertNull)
+ val maybe2 = d.asMaybe(Dispatchers.Unconfined)
+ checkMaybeValue(maybe2, Assert::assertNull)
+ }
+
+ @Test
+ fun testToMaybeFail() {
+ val d = GlobalScope.async {
+ delay(50)
+ throw TestRuntimeException("OK")
+ }
+ val maybe1 = d.asMaybe(Dispatchers.Unconfined)
+ checkErroneous(maybe1) {
+ check(it is TestRuntimeException && it.message == "OK") { "$it" }
+ }
+ val maybe2 = d.asMaybe(Dispatchers.Unconfined)
+ checkErroneous(maybe2) {
+ check(it is TestRuntimeException && it.message == "OK") { "$it" }
+ }
+ }
+
+ @Test
+ fun testToSingle() {
+ val d = GlobalScope.async {
+ delay(50)
+ "OK"
+ }
+ val single1 = d.asSingle(Dispatchers.Unconfined)
+ checkSingleValue(single1) {
+ assertEquals("OK", it)
+ }
+ val single2 = d.asSingle(Dispatchers.Unconfined)
+ checkSingleValue(single2) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testToSingleFail() {
+ val d = GlobalScope.async {
+ delay(50)
+ throw TestRuntimeException("OK")
+ }
+ val single1 = d.asSingle(Dispatchers.Unconfined)
+ checkErroneous(single1) {
+ check(it is TestRuntimeException && it.message == "OK") { "$it" }
+ }
+ val single2 = d.asSingle(Dispatchers.Unconfined)
+ checkErroneous(single2) {
+ check(it is TestRuntimeException && it.message == "OK") { "$it" }
+ }
+ }
+
+ @Test
+ fun testToObservable() {
+ val c = GlobalScope.produce {
+ delay(50)
+ send("O")
+ delay(50)
+ send("K")
+ }
+ val observable = c.consumeAsFlow().asObservable()
+ checkSingleValue(observable.reduce { t1, t2 -> t1 + t2 }.toSingle()) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testToObservableFail() {
+ val c = GlobalScope.produce {
+ delay(50)
+ send("O")
+ delay(50)
+ throw TestException("K")
+ }
+ val observable = c.consumeAsFlow().asObservable()
+ val single = rxSingle(Dispatchers.Unconfined) {
+ var result = ""
+ try {
+ observable.collect { result += it }
+ } catch(e: Throwable) {
+ check(e is TestException)
+ result += e.message
+ }
+ result
+ }
+ checkSingleValue(single) {
+ assertEquals("OK", it)
+ }
+ }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/FlowAsObservableTest.kt b/reactive/kotlinx-coroutines-rx3/test/FlowAsObservableTest.kt
new file mode 100644
index 0000000..50c4ae7
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/FlowAsObservableTest.kt
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import org.junit.Test
+import kotlin.test.*
+
+class FlowAsObservableTest : TestBase() {
+ @Test
+ fun testBasicSuccess() = runTest {
+ expect(1)
+ val observable = flow {
+ expect(3)
+ emit("OK")
+ }.asObservable()
+
+ expect(2)
+ observable.subscribe { value ->
+ expect(4)
+ assertEquals("OK", value)
+ }
+
+ finish(5)
+ }
+
+ @Test
+ fun testBasicFailure() = runTest {
+ expect(1)
+ val observable = flow<Int> {
+ expect(3)
+ throw RuntimeException("OK")
+ }.asObservable()
+
+ expect(2)
+ observable.subscribe({ expectUnreached() }, { error ->
+ expect(4)
+ assertTrue(error is RuntimeException)
+ assertEquals("OK", error.message)
+ })
+ finish(5)
+ }
+
+ @Test
+ fun testBasicUnsubscribe() = runTest {
+ expect(1)
+ val observable = flow<Int> {
+ expect(3)
+ hang {
+ expect(4)
+ }
+ }.asObservable()
+
+ expect(2)
+ val sub = observable.subscribe({ expectUnreached() }, { expectUnreached() })
+ sub.dispose() // will cancel coroutine
+ finish(5)
+ }
+
+ @Test
+ fun testNotifyOnceOnCancellation() = runTest {
+ val observable =
+ flow {
+ expect(3)
+ emit("OK")
+ hang {
+ expect(7)
+ }
+ }.asObservable()
+ .doOnNext {
+ expect(4)
+ assertEquals("OK", it)
+ }
+ .doOnDispose {
+ expect(6) // notified once!
+ }
+
+ expect(1)
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ expect(2)
+ observable.collect {
+ expect(5)
+ assertEquals("OK", it)
+ }
+ }
+
+ yield()
+ job.cancelAndJoin()
+ finish(8)
+ }
+
+ @Test
+ fun testFailingConsumer() = runTest {
+ expect(1)
+ val observable = flow {
+ expect(2)
+ emit("OK")
+ hang {
+ expect(4)
+ }
+
+ }.asObservable()
+
+ try {
+ observable.collect {
+ expect(3)
+ throw TestException()
+ }
+ } catch (e: TestException) {
+ finish(5)
+ }
+ }
+
+ @Test
+ fun testNonAtomicStart() = runTest {
+ withContext(Dispatchers.Unconfined) {
+ val observable = flow<Int> {
+ expect(1)
+ }.asObservable()
+
+ val disposable = observable.subscribe({ expectUnreached() }, { expectUnreached() }, { expectUnreached() })
+ disposable.dispose()
+ }
+ finish(2)
+ }
+
+ @Test
+ fun testFlowCancelledFromWithin() = runTest {
+ val observable = flow {
+ expect(1)
+ emit(1)
+ kotlin.coroutines.coroutineContext.cancel()
+ kotlin.coroutines.coroutineContext.ensureActive()
+ expectUnreached()
+ }.asObservable()
+
+ observable.subscribe({ expect(2) }, { expectUnreached() }, { finish(3) })
+ }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/FlowableContextTest.kt b/reactive/kotlinx-coroutines-rx3/test/FlowableContextTest.kt
new file mode 100644
index 0000000..b70e0d5
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/FlowableContextTest.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.reactive.*
+import org.junit.*
+import org.junit.Test
+import kotlin.test.*
+
+class FlowableContextTest : TestBase() {
+ private val dispatcher = newSingleThreadContext("FlowableContextTest")
+
+ @After
+ fun tearDown() {
+ dispatcher.close()
+ }
+
+ @Test
+ fun testFlowableCreateAsFlowThread() = runTest {
+ expect(1)
+ val mainThread = Thread.currentThread()
+ val dispatcherThread = withContext(dispatcher) { Thread.currentThread() }
+ assertTrue(dispatcherThread != mainThread)
+ Flowable.create<String>({
+ assertEquals(dispatcherThread, Thread.currentThread())
+ it.onNext("OK")
+ it.onComplete()
+ }, BackpressureStrategy.BUFFER)
+ .asFlow()
+ .flowOn(dispatcher)
+ .collect {
+ expect(2)
+ assertEquals("OK", it)
+ assertEquals(mainThread, Thread.currentThread())
+ }
+ finish(3)
+ }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt
new file mode 100644
index 0000000..8cbd7ee
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.exceptions.*
+import kotlinx.coroutines.*
+import org.junit.*
+import org.junit.Test
+import kotlin.test.*
+
+class FlowableExceptionHandlingTest : TestBase() {
+
+ @Before
+ fun setup() {
+ ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
+ }
+
+ private inline fun <reified T : Throwable> handler(expect: Int) = { t: Throwable ->
+ assertTrue(t is UndeliverableException && t.cause is T)
+ expect(expect)
+ }
+
+ private fun cehUnreached() = CoroutineExceptionHandler { _, _ -> expectUnreached() }
+
+ @Test
+ fun testException() = withExceptionHandler({ expectUnreached() }) {
+ rxFlowable<Int>(Dispatchers.Unconfined + cehUnreached()) {
+ expect(1)
+ throw TestException()
+ }.subscribe({
+ expectUnreached()
+ }, {
+ expect(2) // Reported to onError
+ })
+ finish(3)
+ }
+
+ @Test
+ fun testFatalException() = withExceptionHandler(handler<LinkageError>(3)) {
+ rxFlowable<Int>(Dispatchers.Unconfined) {
+ expect(1)
+ throw LinkageError()
+ }.subscribe({
+ expectUnreached()
+ }, {
+ expect(2) // Fatal exception is reported to both onError and CEH
+ })
+ finish(4)
+ }
+
+ @Test
+ fun testExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) {
+ rxFlowable<Int>(Dispatchers.Unconfined + cehUnreached()) {
+ expect(1)
+ throw TestException()
+ }.publish()
+ .refCount()
+ .subscribe({
+ expectUnreached()
+ }, {
+ expect(2) // Reported to onError
+ })
+ finish(3)
+ }
+
+ @Test
+ fun testFatalExceptionAsynchronous() = withExceptionHandler(handler<LinkageError>(3)) {
+ rxFlowable<Int>(Dispatchers.Unconfined) {
+ expect(1)
+ throw LinkageError()
+ }.publish()
+ .refCount()
+ .subscribe({
+ expectUnreached()
+ }, {
+ expect(2)
+ })
+ finish(4)
+ }
+
+ @Test
+ fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) {
+ rxFlowable(Dispatchers.Unconfined) {
+ expect(1)
+ send(Unit)
+ }.subscribe({
+ expect(2)
+ throw LinkageError()
+ }, { expect(3) }) // Fatal exception is reported to both onError and CEH
+ finish(5)
+ }
+
+ @Test
+ fun testExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) {
+ rxFlowable(Dispatchers.Unconfined + cehUnreached()) {
+ expect(1)
+ send(Unit)
+ }.subscribe({
+ expect(2)
+ throw TestException()
+ }, { expect(3) }) // not reported to onError because came from the subscribe itself
+ finish(4)
+ }
+
+ @Test
+ fun testAsynchronousExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) {
+ rxFlowable(Dispatchers.Unconfined + cehUnreached()) {
+ expect(1)
+ send(Unit)
+ }.publish()
+ .refCount()
+ .subscribe({
+ expect(2)
+ throw RuntimeException()
+ }, { expect(3) })
+ finish(4)
+ }
+
+ @Test
+ fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
+ rxFlowable(Dispatchers.Unconfined) {
+ expect(1)
+ send(Unit)
+ }.publish()
+ .refCount()
+ .subscribe({
+ expect(2)
+ throw LinkageError()
+ }, { expectUnreached() })
+ finish(4)
+ }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/FlowableTest.kt b/reactive/kotlinx-coroutines-rx3/test/FlowableTest.kt
new file mode 100644
index 0000000..746d6e8
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/FlowableTest.kt
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.reactive.*
+import org.junit.Test
+import kotlin.test.*
+
+class FlowableTest : TestBase() {
+ @Test
+ fun testBasicSuccess() = runBlocking {
+ expect(1)
+ val observable = rxFlowable(currentDispatcher()) {
+ expect(4)
+ send("OK")
+ }
+ expect(2)
+ observable.subscribe { value ->
+ expect(5)
+ assertEquals("OK", value)
+ }
+ expect(3)
+ yield() // to started coroutine
+ finish(6)
+ }
+
+ @Test
+ fun testBasicFailure() = runBlocking {
+ expect(1)
+ val observable = rxFlowable<String>(currentDispatcher()) {
+ expect(4)
+ throw RuntimeException("OK")
+ }
+ expect(2)
+ observable.subscribe({
+ expectUnreached()
+ }, { error ->
+ expect(5)
+ assertTrue(error is RuntimeException)
+ assertEquals("OK", error.message)
+ })
+ expect(3)
+ yield() // to started coroutine
+ finish(6)
+ }
+
+ @Test
+ fun testBasicUnsubscribe() = runBlocking {
+ expect(1)
+ val observable = rxFlowable<String>(currentDispatcher()) {
+ expect(4)
+ yield() // back to main, will get cancelled
+ expectUnreached()
+ }
+ expect(2)
+ val sub = observable.subscribe({
+ expectUnreached()
+ }, {
+ expectUnreached()
+ })
+ expect(3)
+ yield() // to started coroutine
+ expect(5)
+ sub.dispose() // will cancel coroutine
+ yield()
+ finish(6)
+ }
+
+ @Test
+ fun testNotifyOnceOnCancellation() = runTest {
+ expect(1)
+ val observable =
+ rxFlowable(currentDispatcher()) {
+ expect(5)
+ send("OK")
+ try {
+ delay(Long.MAX_VALUE)
+ } catch (e: CancellationException) {
+ expect(11)
+ }
+ }
+ .doOnNext {
+ expect(6)
+ assertEquals("OK", it)
+ }
+ .doOnCancel {
+ expect(10) // notified once!
+ }
+ expect(2)
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ expect(3)
+ observable.collect {
+ expect(8)
+ assertEquals("OK", it)
+ }
+ }
+ expect(4)
+ yield() // to observable code
+ expect(7)
+ yield() // to consuming coroutines
+ expect(9)
+ job.cancel()
+ job.join()
+ finish(12)
+ }
+
+ @Test
+ fun testFailingConsumer() = runTest {
+ val pub = rxFlowable(currentDispatcher()) {
+ repeat(3) {
+ expect(it + 1) // expect(1), expect(2) *should* be invoked
+ send(it)
+ }
+ }
+ try {
+ pub.collect {
+ throw TestException()
+ }
+ } catch (e: TestException) {
+ finish(3)
+ }
+ }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt
new file mode 100644
index 0000000..395672c
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt
@@ -0,0 +1,136 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.consumeAsFlow
+import org.junit.Test
+import org.junit.runner.*
+import org.junit.runners.*
+import kotlin.coroutines.*
+import kotlin.test.*
+
+@RunWith(Parameterized::class)
+class IntegrationTest(
+ private val ctx: Ctx,
+ private val delay: Boolean
+) : TestBase() {
+
+ enum class Ctx {
+ MAIN { override fun invoke(context: CoroutineContext): CoroutineContext = context.minusKey(Job) },
+ DEFAULT { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Default },
+ UNCONFINED { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Unconfined };
+
+ abstract operator fun invoke(context: CoroutineContext): CoroutineContext
+ }
+
+ companion object {
+ @Parameterized.Parameters(name = "ctx={0}, delay={1}")
+ @JvmStatic
+ fun params(): Collection<Array<Any>> = Ctx.values().flatMap { ctx ->
+ listOf(false, true).map { delay ->
+ arrayOf(ctx, delay)
+ }
+ }
+ }
+
+ @Test
+ fun testEmpty(): Unit = runBlocking {
+ val observable = rxObservable<String>(ctx(coroutineContext)) {
+ if (delay) delay(1)
+ // does not send anything
+ }
+ assertFailsWith<NoSuchElementException> { observable.awaitFirst() }
+ assertEquals("OK", observable.awaitFirstOrDefault("OK"))
+ assertNull(observable.awaitFirstOrNull())
+ assertEquals("ELSE", observable.awaitFirstOrElse { "ELSE" })
+ assertFailsWith<NoSuchElementException> { observable.awaitLast() }
+ assertFailsWith<NoSuchElementException> { observable.awaitSingle() }
+ var cnt = 0
+ observable.collect {
+ cnt++
+ }
+ assertEquals(0, cnt)
+ }
+
+ @Test
+ fun testSingle() = runBlocking {
+ val observable = rxObservable(ctx(coroutineContext)) {
+ if (delay) delay(1)
+ send("OK")
+ }
+ assertEquals("OK", observable.awaitFirst())
+ assertEquals("OK", observable.awaitFirstOrDefault("OK"))
+ assertEquals("OK", observable.awaitFirstOrNull())
+ assertEquals("OK", observable.awaitFirstOrElse { "ELSE" })
+ assertEquals("OK", observable.awaitLast())
+ assertEquals("OK", observable.awaitSingle())
+ var cnt = 0
+ observable.collect {
+ assertEquals("OK", it)
+ cnt++
+ }
+ assertEquals(1, cnt)
+ }
+
+ @Test
+ fun testNumbers() = runBlocking<Unit> {
+ val n = 100 * stressTestMultiplier
+ val observable = rxObservable(ctx(coroutineContext)) {
+ for (i in 1..n) {
+ send(i)
+ if (delay) delay(1)
+ }
+ }
+ assertEquals(1, observable.awaitFirst())
+ assertEquals(1, observable.awaitFirstOrDefault(0))
+ assertEquals(1, observable.awaitFirstOrNull())
+ assertEquals(1, observable.awaitFirstOrElse { 0 })
+ assertEquals(n, observable.awaitLast())
+ assertFailsWith<IllegalArgumentException> { observable.awaitSingle() }
+ checkNumbers(n, observable)
+ val channel = observable.openSubscription()
+ ctx(coroutineContext)
+ checkNumbers(n, channel.consumeAsFlow().asObservable())
+ channel.cancel()
+ }
+
+ @Test
+ fun testCancelWithoutValue() = runTest {
+ val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
+ rxObservable<String> {
+ hang { }
+ }.awaitFirst()
+ }
+
+ job.cancel()
+ job.join()
+ }
+
+ @Test
+ fun testEmptySingle() = runTest(unhandled = listOf({e -> e is NoSuchElementException})) {
+ expect(1)
+ val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
+ rxObservable<String> {
+ yield()
+ expect(2)
+ // Nothing to emit
+ }.awaitFirst()
+ }
+
+ job.join()
+ finish(3)
+ }
+
+ private suspend fun checkNumbers(n: Int, observable: Observable<Int>) {
+ var last = 0
+ observable.collect {
+ assertEquals(++last, it)
+ }
+ assertEquals(n, last)
+ }
+
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/IterableFlowAsFlowableTckTest.kt b/reactive/kotlinx-coroutines-rx3/test/IterableFlowAsFlowableTckTest.kt
new file mode 100644
index 0000000..8e54922
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/IterableFlowAsFlowableTckTest.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import kotlinx.coroutines.flow.*
+import org.junit.*
+import org.reactivestreams.*
+import org.reactivestreams.tck.*
+
+class IterableFlowAsFlowableTckTest : PublisherVerification<Long>(TestEnvironment()) {
+
+ private fun generate(num: Long): Array<Long> {
+ return Array(if (num >= Integer.MAX_VALUE) 1000000 else num.toInt()) { it.toLong() }
+ }
+
+ override fun createPublisher(elements: Long): Flowable<Long> {
+ return generate(elements).asIterable().asFlow().asFlowable()
+ }
+
+ override fun createFailedPublisher(): Publisher<Long>? = null
+
+ @Ignore
+ override fun required_spec309_requestZeroMustSignalIllegalArgumentException() {
+ }
+
+ @Ignore
+ override fun required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() {
+ }
+
+ @Ignore
+ override fun required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() {
+ //
+ }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/LeakedExceptionTest.kt b/reactive/kotlinx-coroutines-rx3/test/LeakedExceptionTest.kt
new file mode 100644
index 0000000..028ded0
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/LeakedExceptionTest.kt
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import io.reactivex.rxjava3.exceptions.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.reactive.*
+import org.junit.Test
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
+import kotlin.test.*
+
+// Check that exception is not leaked to the global exception handler
+class LeakedExceptionTest : TestBase() {
+
+ private val handler: (Throwable) -> Unit =
+ { assertTrue { it is UndeliverableException && it.cause is TestException } }
+
+ @Test
+ fun testSingle() = withExceptionHandler(handler) {
+ withFixedThreadPool(4) { dispatcher ->
+ val flow = rxSingle<Unit>(dispatcher) { throw TestException() }.toFlowable().asFlow()
+ runBlocking {
+ repeat(10000) {
+ combine(flow, flow) { _, _ -> Unit }
+ .catch {}
+ .collect {}
+ }
+ }
+ }
+ }
+
+ @Test
+ fun testObservable() = withExceptionHandler(handler) {
+ withFixedThreadPool(4) { dispatcher ->
+ val flow = rxObservable<Unit>(dispatcher) { throw TestException() }
+ .toFlowable(BackpressureStrategy.BUFFER)
+ .asFlow()
+ runBlocking {
+ repeat(10000) {
+ combine(flow, flow) { _, _ -> Unit }
+ .catch {}
+ .collect {}
+ }
+ }
+ }
+ }
+
+ @Test
+ fun testFlowable() = withExceptionHandler(handler) {
+ withFixedThreadPool(4) { dispatcher ->
+ val flow = rxFlowable<Unit>(dispatcher) { throw TestException() }.asFlow()
+ runBlocking {
+ repeat(10000) {
+ combine(flow, flow) { _, _ -> Unit }
+ .catch {}
+ .collect {}
+ }
+ }
+ }
+ }
+
+ /**
+ * This test doesn't test much and was added to display a problem with straighforward use of
+ * [withExceptionHandler].
+ *
+ * If one was to remove `dispatcher` and launch `rxFlowable` with an empty coroutine context,
+ * this test would fail fairly often, while other tests were also vulnerable, but the problem is
+ * much more difficult to reproduce. Thus, this test is a justification for adding `dispatcher`
+ * to other tests.
+ *
+ * See the commit that introduced this test for a better explanation.
+ */
+ @Test
+ fun testResettingExceptionHandler() = withExceptionHandler(handler) {
+ withFixedThreadPool(4) { dispatcher ->
+ val flow = rxFlowable<Unit>(dispatcher) {
+ if ((0..1).random() == 0) {
+ Thread.sleep(100)
+ }
+ throw TestException()
+ }.asFlow()
+ runBlocking {
+ combine(flow, flow) { _, _ -> Unit }
+ .catch {}
+ .collect {}
+ }
+ }
+ }
+
+ /**
+ * Run in a thread pool, then wait for all the tasks to finish.
+ */
+ private fun withFixedThreadPool(numberOfThreads: Int, block: (CoroutineDispatcher) -> Unit) {
+ val pool = Executors.newFixedThreadPool(numberOfThreads)
+ val dispatcher = pool.asCoroutineDispatcher()
+ block(dispatcher)
+ pool.shutdown()
+ while (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ /* deliberately empty */
+ }
+ }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt b/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt
new file mode 100644
index 0000000..e0cec74
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt
@@ -0,0 +1,316 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import io.reactivex.rxjava3.disposables.*
+import io.reactivex.rxjava3.exceptions.*
+import io.reactivex.rxjava3.functions.*
+import io.reactivex.rxjava3.internal.functions.Functions.*
+import kotlinx.coroutines.*
+import org.junit.*
+import org.junit.Test
+import java.util.concurrent.*
+import java.util.concurrent.CancellationException
+import kotlin.test.*
+
+class MaybeTest : TestBase() {
+ @Before
+ fun setup() {
+ ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
+ }
+
+ @Test
+ fun testBasicSuccess() = runBlocking {
+ expect(1)
+ val maybe = rxMaybe(currentDispatcher()) {
+ expect(4)
+ "OK"
+ }
+ expect(2)
+ maybe.subscribe { value ->
+ expect(5)
+ assertEquals("OK", value)
+ }
+ expect(3)
+ yield() // to started coroutine
+ finish(6)
+ }
+
+ @Test
+ fun testBasicEmpty() = runBlocking {
+ expect(1)
+ val maybe = rxMaybe(currentDispatcher()) {
+ expect(4)
+ null
+ }
+ expect(2)
+ maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, Action {
+ expect(5)
+ })
+ expect(3)
+ yield() // to started coroutine
+ finish(6)
+ }
+
+ @Test
+ fun testBasicFailure() = runBlocking {
+ expect(1)
+ val maybe = rxMaybe(currentDispatcher()) {
+ expect(4)
+ throw RuntimeException("OK")
+ }
+ expect(2)
+ maybe.subscribe({
+ expectUnreached()
+ }, { error ->
+ expect(5)
+ assertTrue(error is RuntimeException)
+ assertEquals("OK", error.message)
+ })
+ expect(3)
+ yield() // to started coroutine
+ finish(6)
+ }
+
+
+ @Test
+ fun testBasicUnsubscribe() = runBlocking {
+ expect(1)
+ val maybe = rxMaybe(currentDispatcher()) {
+ expect(4)
+ yield() // back to main, will get cancelled
+ expectUnreached()
+ }
+ expect(2)
+ // nothing is called on a disposed rx3 maybe
+ val sub = maybe.subscribe({
+ expectUnreached()
+ }, {
+ expectUnreached()
+ })
+ expect(3)
+ yield() // to started coroutine
+ expect(5)
+ sub.dispose() // will cancel coroutine
+ yield()
+ finish(6)
+ }
+
+ @Test
+ fun testMaybeNoWait() {
+ val maybe = rxMaybe {
+ "OK"
+ }
+
+ checkMaybeValue(maybe) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testMaybeAwait() = runBlocking {
+ assertEquals("OK", Maybe.just("O").await() + "K")
+ }
+
+ @Test
+ fun testMaybeAwaitForNull() = runBlocking {
+ assertNull(Maybe.empty<String>().await())
+ }
+
+ @Test
+ fun testMaybeEmitAndAwait() {
+ val maybe = rxMaybe {
+ Maybe.just("O").await() + "K"
+ }
+
+ checkMaybeValue(maybe) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testMaybeWithDelay() {
+ val maybe = rxMaybe {
+ Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K"
+ }
+
+ checkMaybeValue(maybe) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testMaybeException() {
+ val maybe = rxMaybe {
+ Observable.just("O", "K").awaitSingle() + "K"
+ }
+
+ checkErroneous(maybe) {
+ assert(it is IllegalArgumentException)
+ }
+ }
+
+ @Test
+ fun testAwaitFirst() {
+ val maybe = rxMaybe {
+ Observable.just("O", "#").awaitFirst() + "K"
+ }
+
+ checkMaybeValue(maybe) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testAwaitLast() {
+ val maybe = rxMaybe {
+ Observable.just("#", "O").awaitLast() + "K"
+ }
+
+ checkMaybeValue(maybe) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testExceptionFromObservable() {
+ val maybe = rxMaybe {
+ try {
+ Observable.error<String>(RuntimeException("O")).awaitFirst()
+ } catch (e: RuntimeException) {
+ Observable.just(e.message!!).awaitLast() + "K"
+ }
+ }
+
+ checkMaybeValue(maybe) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testExceptionFromCoroutine() {
+ val maybe = rxMaybe<String> {
+ throw IllegalStateException(Observable.just("O").awaitSingle() + "K")
+ }
+
+ checkErroneous(maybe) {
+ assert(it is IllegalStateException)
+ assertEquals("OK", it.message)
+ }
+ }
+
+ @Test
+ fun testCancelledConsumer() = runTest {
+ expect(1)
+ val maybe = rxMaybe<Int>(currentDispatcher()) {
+ expect(4)
+ try {
+ delay(Long.MAX_VALUE)
+ } catch (e: CancellationException) {
+ expect(6)
+ }
+ 42
+ }
+ expect(2)
+ val timeout = withTimeoutOrNull(100) {
+ expect(3)
+ maybe.collect {
+ expectUnreached()
+ }
+ expectUnreached()
+ }
+ assertNull(timeout)
+ expect(5)
+ yield() // must cancel code inside maybe!!!
+ finish(7)
+ }
+
+ @Test
+ fun testSuppressedException() = runTest {
+ val maybe = rxMaybe(currentDispatcher()) {
+ launch(start = CoroutineStart.ATOMIC) {
+ throw TestException() // child coroutine fails
+ }
+ try {
+ delay(Long.MAX_VALUE)
+ } finally {
+ throw TestException2() // but parent throws another exception while cleaning up
+ }
+ }
+ try {
+ maybe.await()
+ expectUnreached()
+ } catch (e: TestException) {
+ assertTrue(e.suppressed[0] is TestException2)
+ }
+ }
+
+ @Test
+ fun testUnhandledException() = runTest {
+ expect(1)
+ var disposable: Disposable? = null
+ val handler = { e: Throwable ->
+ assertTrue(e is UndeliverableException && e.cause is TestException)
+ expect(5)
+ }
+ val maybe = rxMaybe(currentDispatcher()) {
+ expect(4)
+ disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled
+ try {
+ delay(Long.MAX_VALUE)
+ } finally {
+ throw TestException() // would not be able to handle it since mono is disposed
+ }
+ }
+ withExceptionHandler(handler) {
+ maybe.subscribe(object : MaybeObserver<Unit> {
+ override fun onSubscribe(d: Disposable) {
+ expect(2)
+ disposable = d
+ }
+
+ override fun onComplete() {
+ expectUnreached()
+ }
+
+ override fun onSuccess(t: Unit) {
+ expectUnreached()
+ }
+
+ override fun onError(t: Throwable) {
+ expectUnreached()
+ }
+ })
+ expect(3)
+ yield() // run coroutine
+ finish(6)
+ }
+ }
+
+ @Test
+ fun testFatalExceptionInSubscribe() = runTest {
+ val handler = { e: Throwable ->
+ assertTrue(e is UndeliverableException && e.cause is LinkageError)
+ expect(2)
+ }
+
+ withExceptionHandler(handler) {
+ rxMaybe(Dispatchers.Unconfined) {
+ expect(1)
+ 42
+ }.subscribe({ throw LinkageError() })
+ finish(3)
+ }
+ }
+
+ @Test
+ fun testFatalExceptionInSingle() = runTest {
+ rxMaybe(Dispatchers.Unconfined) {
+ throw LinkageError()
+ }.subscribe({ expectUnreached() }, { expect(1); assertTrue(it is LinkageError) })
+ finish(2)
+ }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableAsFlowTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableAsFlowTest.kt
new file mode 100644
index 0000000..262da9a
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableAsFlowTest.kt
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.Observable
+import io.reactivex.rxjava3.core.ObservableSource
+import io.reactivex.rxjava3.core.Observer
+import io.reactivex.rxjava3.disposables.Disposable
+import io.reactivex.rxjava3.subjects.PublishSubject
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import kotlin.test.*
+
+class ObservableAsFlowTest : TestBase() {
+ @Test
+ fun testCancellation() = runTest {
+ var onNext = 0
+ var onCancelled = 0
+ var onError = 0
+
+ val source = rxObservable(currentDispatcher()) {
+ coroutineContext[Job]?.invokeOnCompletion {
+ if (it is CancellationException) ++onCancelled
+ }
+
+ repeat(100) {
+ send(it)
+ }
+ }
+
+ source.asFlow().launchIn(CoroutineScope(Dispatchers.Unconfined)) {
+ onEach {
+ ++onNext
+ throw RuntimeException()
+ }
+ catch<Throwable> {
+ ++onError
+ }
+ }.join()
+
+
+ assertEquals(1, onNext)
+ assertEquals(1, onError)
+ assertEquals(1, onCancelled)
+ }
+
+ @Test
+ fun testImmediateCollection() {
+ val source = PublishSubject.create<Int>()
+ val flow = source.asFlow()
+ GlobalScope.launch(Dispatchers.Unconfined) {
+ expect(1)
+ flow.collect { expect(it) }
+ expect(6)
+ }
+ expect(2)
+ source.onNext(3)
+ expect(4)
+ source.onNext(5)
+ source.onComplete()
+ finish(7)
+ }
+
+ @Test
+ fun testOnErrorCancellation() {
+ val source = PublishSubject.create<Int>()
+ val flow = source.asFlow()
+ val exception = RuntimeException()
+ GlobalScope.launch(Dispatchers.Unconfined) {
+ try {
+ expect(1)
+ flow.collect { expect(it) }
+ expectUnreached()
+ }
+ catch (e: Exception) {
+ assertSame(exception, e.cause)
+ expect(5)
+ }
+ expect(6)
+ }
+ expect(2)
+ source.onNext(3)
+ expect(4)
+ source.onError(exception)
+ finish(7)
+ }
+
+ @Test
+ fun testUnsubscribeOnCollectionException() {
+ val source = PublishSubject.create<Int>()
+ val flow = source.asFlow()
+ val exception = RuntimeException()
+ GlobalScope.launch(Dispatchers.Unconfined) {
+ try {
+ expect(1)
+ flow.collect {
+ expect(it)
+ if (it == 3) throw exception
+ }
+ expectUnreached()
+ }
+ catch (e: Exception) {
+ assertSame(exception, e.cause)
+ expect(4)
+ }
+ expect(5)
+ }
+ expect(2)
+ assertTrue(source.hasObservers())
+ source.onNext(3)
+ assertFalse(source.hasObservers())
+ finish(6)
+ }
+
+ @Test
+ fun testLateOnSubscribe() {
+ var observer: Observer<in Int>? = null
+ val source = ObservableSource<Int> { observer = it }
+ val flow = source.asFlow()
+ assertNull(observer)
+ val job = GlobalScope.launch(Dispatchers.Unconfined) {
+ expect(1)
+ flow.collect { expectUnreached() }
+ expectUnreached()
+ }
+ expect(2)
+ assertNotNull(observer)
+ job.cancel()
+ val disposable = Disposable.empty()
+ observer!!.onSubscribe(disposable)
+ assertTrue(disposable.isDisposed)
+ finish(3)
+ }
+
+ @Test
+ fun testBufferUnlimited() = runTest {
+ val source = rxObservable(currentDispatcher()) {
+ expect(1); send(10)
+ expect(2); send(11)
+ expect(3); send(12)
+ expect(4); send(13)
+ expect(5); send(14)
+ expect(6); send(15)
+ expect(7); send(16)
+ expect(8); send(17)
+ expect(9)
+ }
+ source.asFlow().buffer(Channel.UNLIMITED).collect { expect(it) }
+ finish(18)
+ }
+
+ @Test
+ fun testConflated() = runTest {
+ val source = Observable.range(1, 5)
+ val list = source.asFlow().conflate().toList()
+ assertEquals(listOf(1, 5), list)
+ }
+
+ @Test
+ fun testLongRange() = runTest {
+ val source = Observable.range(1, 10_000)
+ val count = source.asFlow().count()
+ assertEquals(10_000, count)
+ }
+
+ @Test
+ fun testProduce() = runTest {
+ val source = Observable.range(0, 10)
+ val flow = source.asFlow()
+ check((0..9).toList(), flow.produceIn(this))
+ check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this))
+ check((0..9).toList(), flow.buffer(2).produceIn(this))
+ check((0..9).toList(), flow.buffer(0).produceIn(this))
+ check(listOf(0, 9), flow.conflate().produceIn(this))
+ }
+
+ private suspend fun check(expected: List<Int>, channel: ReceiveChannel<Int>) {
+ val result = ArrayList<Int>(10)
+ channel.consumeEach { result.add(it) }
+ assertEquals(expected, result)
+ }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableCompletionStressTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableCompletionStressTest.kt
new file mode 100644
index 0000000..c1d25bc
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableCompletionStressTest.kt
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import kotlinx.coroutines.*
+import org.junit.*
+import java.util.*
+import kotlin.coroutines.*
+
+class ObservableCompletionStressTest : TestBase() {
+ private val N_REPEATS = 10_000 * stressTestMultiplier
+
+ private fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = rxObservable(context) {
+ for (x in start until start + count) send(x)
+ }
+
+ @Test
+ fun testCompletion() {
+ val rnd = Random()
+ repeat(N_REPEATS) {
+ val count = rnd.nextInt(5)
+ runBlocking {
+ withTimeout(5000) {
+ var received = 0
+ range(Dispatchers.Default, 1, count).collect { x ->
+ received++
+ if (x != received) error("$x != $received")
+ }
+ if (received != count) error("$received != $count")
+ }
+ }
+ }
+ }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt
new file mode 100644
index 0000000..1183b2a
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.exceptions.*
+import kotlinx.coroutines.*
+import org.junit.*
+import org.junit.Test
+import kotlin.test.*
+
+class ObservableExceptionHandlingTest : TestBase() {
+
+ @Before
+ fun setup() {
+ ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
+ }
+
+ private inline fun <reified T : Throwable> handler(expect: Int) = { t: Throwable ->
+ assertTrue(t is UndeliverableException && t.cause is T)
+ expect(expect)
+ }
+
+ private fun cehUnreached() = CoroutineExceptionHandler { _, _ -> expectUnreached() }
+
+ @Test
+ fun testException() = withExceptionHandler({ expectUnreached() }) {
+ rxObservable<Int>(Dispatchers.Unconfined + cehUnreached()) {
+ expect(1)
+ throw TestException()
+ }.subscribe({
+ expectUnreached()
+ }, {
+ expect(2) // Reported to onError
+ })
+ finish(3)
+ }
+
+ @Test
+ fun testFatalException() = withExceptionHandler(handler<LinkageError>(3)) {
+ rxObservable<Int>(Dispatchers.Unconfined) {
+ expect(1)
+ throw LinkageError()
+ }.subscribe({
+ expectUnreached()
+ }, {
+ expect(2)
+ })
+ finish(4)
+ }
+
+ @Test
+ fun testExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) {
+ rxObservable<Int>(Dispatchers.Unconfined) {
+ expect(1)
+ throw TestException()
+ }.publish()
+ .refCount()
+ .subscribe({
+ expectUnreached()
+ }, {
+ expect(2) // Reported to onError
+ })
+ finish(3)
+ }
+
+ @Test
+ fun testFatalExceptionAsynchronous() = withExceptionHandler(handler<LinkageError>(3)) {
+ rxObservable<Int>(Dispatchers.Unconfined) {
+ expect(1)
+ throw LinkageError()
+ }.publish()
+ .refCount()
+ .subscribe({
+ expectUnreached()
+ }, {
+ expect(2) // Fatal exception is not reported in onError
+ })
+ finish(4)
+ }
+
+ @Test
+ fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) {
+ rxObservable(Dispatchers.Unconfined) {
+ expect(1)
+ send(Unit)
+ }.subscribe({
+ expect(2)
+ throw LinkageError()
+ }, { expect(3) }) // Unreached because fatal errors are rethrown
+ finish(5)
+ }
+
+ @Test
+ fun testExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) {
+ rxObservable(Dispatchers.Unconfined) {
+ expect(1)
+ send(Unit)
+ }.subscribe({
+ expect(2)
+ throw TestException()
+ }, { expect(3) }) // not reported to onError because came from the subscribe itself
+ finish(4)
+ }
+
+ @Test
+ fun testAsynchronousExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) {
+ rxObservable(Dispatchers.Unconfined) {
+ expect(1)
+ send(Unit)
+ }.publish()
+ .refCount()
+ .subscribe({
+ expect(2)
+ throw RuntimeException()
+ }, { expect(3) })
+ finish(4)
+ }
+
+ @Test
+ fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) {
+ rxObservable(Dispatchers.Unconfined) {
+ expect(1)
+ send(Unit)
+ }.publish()
+ .refCount()
+ .subscribe({
+ expect(2)
+ throw LinkageError()
+ }, { expect(3) })
+ finish(5)
+ }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableMultiTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableMultiTest.kt
new file mode 100644
index 0000000..b4adf7a
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableMultiTest.kt
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import kotlinx.coroutines.*
+import org.junit.Test
+import java.io.*
+import kotlin.test.*
+
+/**
+ * Test emitting multiple values with [rxObservable].
+ */
+class ObservableMultiTest : TestBase() {
+ @Test
+ fun testNumbers() {
+ val n = 100 * stressTestMultiplier
+ val observable = rxObservable {
+ repeat(n) { send(it) }
+ }
+ checkSingleValue(observable.toList()) { list ->
+ assertEquals((0 until n).toList(), list)
+ }
+ }
+
+
+ @Test
+ fun testConcurrentStress() {
+ val n = 10_000 * stressTestMultiplier
+ val observable = rxObservable {
+ newCoroutineContext(coroutineContext)
+ // concurrent emitters (many coroutines)
+ val jobs = List(n) {
+ // launch
+ launch {
+ val i = it
+ send(i)
+ }
+ }
+ jobs.forEach { it.join() }
+ }
+ checkSingleValue(observable.toList()) { list ->
+ assertEquals(n, list.size)
+ assertEquals((0 until n).toList(), list.sorted())
+ }
+ }
+
+ @Test
+ fun testIteratorResendUnconfined() {
+ val n = 10_000 * stressTestMultiplier
+ val observable = rxObservable(Dispatchers.Unconfined) {
+ Observable.range(0, n).collect { send(it) }
+ }
+ checkSingleValue(observable.toList()) { list ->
+ assertEquals((0 until n).toList(), list)
+ }
+ }
+
+ @Test
+ fun testIteratorResendPool() {
+ val n = 10_000 * stressTestMultiplier
+ val observable = rxObservable {
+ Observable.range(0, n).collect { send(it) }
+ }
+ checkSingleValue(observable.toList()) { list ->
+ assertEquals((0 until n).toList(), list)
+ }
+ }
+
+ @Test
+ fun testSendAndCrash() {
+ val observable = rxObservable {
+ send("O")
+ throw IOException("K")
+ }
+ val single = rxSingle {
+ var result = ""
+ try {
+ observable.collect { result += it }
+ } catch(e: IOException) {
+ result += e.message
+ }
+ result
+ }
+ checkSingleValue(single) {
+ assertEquals("OK", it)
+ }
+ }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt
new file mode 100644
index 0000000..2a3ce04
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import kotlinx.coroutines.*
+import org.junit.*
+import org.junit.Test
+import java.util.concurrent.*
+import kotlin.test.*
+
+class ObservableSingleTest : TestBase() {
+ @Before
+ fun setup() {
+ ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
+ }
+
+ @Test
+ fun testSingleNoWait() {
+ val observable = rxObservable {
+ send("OK")
+ }
+
+ checkSingleValue(observable) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testSingleAwait() = runBlocking {
+ assertEquals("OK", Observable.just("O").awaitSingle() + "K")
+ }
+
+ @Test
+ fun testSingleEmitAndAwait() {
+ val observable = rxObservable {
+ send(Observable.just("O").awaitSingle() + "K")
+ }
+
+ checkSingleValue(observable) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testSingleWithDelay() {
+ val observable = rxObservable {
+ send(Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K")
+ }
+
+ checkSingleValue(observable) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testSingleException() {
+ val observable = rxObservable {
+ send(Observable.just("O", "K").awaitSingle() + "K")
+ }
+
+ checkErroneous(observable) {
+ assertTrue(it is IllegalArgumentException)
+ }
+ }
+
+ @Test
+ fun testAwaitFirst() {
+ val observable = rxObservable {
+ send(Observable.just("O", "#").awaitFirst() + "K")
+ }
+
+ checkSingleValue(observable) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testAwaitFirstOrDefault() {
+ val observable = rxObservable {
+ send(Observable.empty<String>().awaitFirstOrDefault("O") + "K")
+ }
+
+ checkSingleValue(observable) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testAwaitFirstOrDefaultWithValues() {
+ val observable = rxObservable {
+ send(Observable.just("O", "#").awaitFirstOrDefault("!") + "K")
+ }
+
+ checkSingleValue(observable) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testAwaitFirstOrNull() {
+ val observable = rxObservable<String> {
+ send(Observable.empty<String>().awaitFirstOrNull() ?: "OK")
+ }
+
+ checkSingleValue(observable) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testAwaitFirstOrNullWithValues() {
+ val observable = rxObservable {
+ send((Observable.just("O", "#").awaitFirstOrNull() ?: "!") + "K")
+ }
+
+ checkSingleValue(observable) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testAwaitFirstOrElse() {
+ val observable = rxObservable {
+ send(Observable.empty<String>().awaitFirstOrElse { "O" } + "K")
+ }
+
+ checkSingleValue(observable) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testAwaitFirstOrElseWithValues() {
+ val observable = rxObservable {
+ send(Observable.just("O", "#").awaitFirstOrElse { "!" } + "K")
+ }
+
+ checkSingleValue(observable) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testAwaitLast() {
+ val observable = rxObservable {
+ send(Observable.just("#", "O").awaitLast() + "K")
+ }
+
+ checkSingleValue(observable) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testExceptionFromObservable() {
+ val observable = rxObservable {
+ try {
+ send(Observable.error<String>(RuntimeException("O")).awaitFirst())
+ } catch (e: RuntimeException) {
+ send(Observable.just(e.message!!).awaitLast() + "K")
+ }
+ }
+
+ checkSingleValue(observable) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testExceptionFromCoroutine() {
+ val observable = rxObservable<String> {
+ throw IllegalStateException(Observable.just("O").awaitSingle() + "K")
+ }
+
+ checkErroneous(observable) {
+ assertTrue(it is IllegalStateException)
+ assertEquals("OK", it.message)
+ }
+ }
+
+ @Test
+ fun testObservableIteration() {
+ val observable = rxObservable {
+ var result = ""
+ Observable.just("O", "K").collect { result += it }
+ send(result)
+ }
+
+ checkSingleValue(observable) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testObservableIterationFailure() {
+ val observable = rxObservable {
+ try {
+ Observable.error<String>(RuntimeException("OK")).collect { fail("Should not be here") }
+ send("Fail")
+ } catch (e: RuntimeException) {
+ send(e.message!!)
+ }
+ }
+
+ checkSingleValue(observable) {
+ assertEquals("OK", it)
+ }
+ }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableSubscriptionSelectTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableSubscriptionSelectTest.kt
new file mode 100644
index 0000000..2f04316
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableSubscriptionSelectTest.kt
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.selects.*
+import org.junit.Test
+import kotlin.test.*
+
+class ObservableSubscriptionSelectTest : TestBase() {
+ @Test
+ fun testSelect() = runTest {
+ // source with n ints
+ val n = 1000 * stressTestMultiplier
+ val source = rxObservable { repeat(n) { send(it) } }
+ var a = 0
+ var b = 0
+ // open two subs
+ val channelA = source.openSubscription()
+ val channelB = source.openSubscription()
+ loop@ while (true) {
+ val done: Int = select {
+ channelA.onReceiveOrNull {
+ if (it != null) assertEquals(a++, it)
+ if (it == null) 0 else 1
+ }
+ channelB.onReceiveOrNull {
+ if (it != null) assertEquals(b++, it)
+ if (it == null) 0 else 2
+ }
+ }
+ when (done) {
+ 0 -> break@loop
+ 1 -> {
+ val r = channelB.receiveOrNull()
+ if (r != null) assertEquals(b++, r)
+ }
+ 2 -> {
+ val r = channelA.receiveOrNull()
+ if (r != null) assertEquals(a++, r)
+ }
+ }
+ }
+ channelA.cancel()
+ channelB.cancel()
+ // should receive one of them fully
+ assertTrue(a == n || b == n)
+ }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableTest.kt
new file mode 100644
index 0000000..c6a6be5
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableTest.kt
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import io.reactivex.rxjava3.plugins.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.CancellationException
+import org.junit.*
+import org.junit.Test
+import java.util.concurrent.*
+import kotlin.test.*
+
+class ObservableTest : TestBase() {
+ @Before
+ fun setup() {
+ ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
+ }
+
+ @Test
+ fun testBasicSuccess() = runBlocking {
+ expect(1)
+ val observable = rxObservable(currentDispatcher()) {
+ expect(4)
+ send("OK")
+ }
+ expect(2)
+ observable.subscribe { value ->
+ expect(5)
+ assertEquals("OK", value)
+ }
+ expect(3)
+ yield() // to started coroutine
+ finish(6)
+ }
+
+ @Test
+ fun testBasicFailure() = runBlocking {
+ expect(1)
+ val observable = rxObservable<String>(currentDispatcher()) {
+ expect(4)
+ throw RuntimeException("OK")
+ }
+ expect(2)
+ observable.subscribe({
+ expectUnreached()
+ }, { error ->
+ expect(5)
+ assertTrue(error is RuntimeException)
+ assertEquals("OK", error.message)
+ })
+ expect(3)
+ yield() // to started coroutine
+ finish(6)
+ }
+
+ @Test
+ fun testBasicUnsubscribe() = runBlocking {
+ expect(1)
+ val observable = rxObservable<String>(currentDispatcher()) {
+ expect(4)
+ yield() // back to main, will get cancelled
+ expectUnreached()
+ }
+ expect(2)
+ val sub = observable.subscribe({
+ expectUnreached()
+ }, {
+ expectUnreached()
+ })
+ expect(3)
+ yield() // to started coroutine
+ expect(5)
+ sub.dispose() // will cancel coroutine
+ yield()
+ finish(6)
+ }
+
+ @Test
+ fun testNotifyOnceOnCancellation() = runTest {
+ expect(1)
+ val observable =
+ rxObservable(currentDispatcher()) {
+ expect(5)
+ send("OK")
+ try {
+ delay(Long.MAX_VALUE)
+ } catch (e: CancellationException) {
+ expect(11)
+ }
+ }
+ .doOnNext {
+ expect(6)
+ assertEquals("OK", it)
+ }
+ .doOnDispose {
+ expect(10) // notified once!
+ }
+ expect(2)
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ expect(3)
+ observable.collect {
+ expect(8)
+ assertEquals("OK", it)
+ }
+ }
+ expect(4)
+ yield() // to observable code
+ expect(7)
+ yield() // to consuming coroutines
+ expect(9)
+ job.cancel()
+ job.join()
+ finish(12)
+ }
+
+ @Test
+ fun testFailingConsumer() = runTest {
+ expect(1)
+ val pub = rxObservable(currentDispatcher()) {
+ expect(2)
+ send("OK")
+ try {
+ delay(Long.MAX_VALUE)
+ } catch (e: CancellationException) {
+ finish(5)
+ }
+ }
+ try {
+ pub.collect {
+ expect(3)
+ throw TestException()
+ }
+ } catch (e: TestException) {
+ expect(4)
+ }
+ }
+
+ @Test
+ fun testExceptionAfterCancellation() {
+ // Test that no exceptions were reported to the global EH (it will fail the test if so)
+ val handler = { e: Throwable ->
+ assertFalse(e is CancellationException)
+ }
+ withExceptionHandler(handler) {
+ RxJavaPlugins.setErrorHandler {
+ require(it !is CancellationException)
+ }
+ Observable
+ .interval(1, TimeUnit.MILLISECONDS)
+ .take(1000)
+ .switchMapSingle {
+ rxSingle {
+ timeBomb().await()
+ }
+ }
+ .blockingSubscribe({}, {})
+ }
+ }
+
+ private fun timeBomb() = Single.timer(1, TimeUnit.MILLISECONDS).doOnSuccess { throw TestException() }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/SchedulerTest.kt b/reactive/kotlinx-coroutines-rx3/test/SchedulerTest.kt
new file mode 100644
index 0000000..9e95c21
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/SchedulerTest.kt
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.schedulers.Schedulers
+import kotlinx.coroutines.*
+import org.junit.Before
+import org.junit.Test
+import kotlin.test.*
+
+class SchedulerTest : TestBase() {
+ @Before
+ fun setup() {
+ ignoreLostThreads("RxCachedThreadScheduler-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
+ }
+
+ @Test
+ fun testIoScheduler(): Unit = runBlocking {
+ expect(1)
+ val mainThread = Thread.currentThread()
+ withContext(Schedulers.io().asCoroutineDispatcher()) {
+ val t1 = Thread.currentThread()
+ assertNotSame(t1, mainThread)
+ expect(2)
+ delay(100)
+ val t2 = Thread.currentThread()
+ assertNotSame(t2, mainThread)
+ expect(3)
+ }
+ finish(4)
+ }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/SingleTest.kt b/reactive/kotlinx-coroutines-rx3/test/SingleTest.kt
new file mode 100644
index 0000000..46bcaf8
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/SingleTest.kt
@@ -0,0 +1,266 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import io.reactivex.rxjava3.disposables.*
+import io.reactivex.rxjava3.exceptions.*
+import io.reactivex.rxjava3.functions.*
+import kotlinx.coroutines.*
+import org.junit.*
+import org.junit.Test
+import java.util.concurrent.*
+import kotlin.test.*
+
+class SingleTest : TestBase() {
+ @Before
+ fun setup() {
+ ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
+ }
+
+ @Test
+ fun testBasicSuccess() = runBlocking {
+ expect(1)
+ val single = rxSingle(currentDispatcher()) {
+ expect(4)
+ "OK"
+ }
+ expect(2)
+ single.subscribe { value ->
+ expect(5)
+ assertEquals("OK", value)
+ }
+ expect(3)
+ yield() // to started coroutine
+ finish(6)
+ }
+
+ @Test
+ fun testBasicFailure() = runBlocking {
+ expect(1)
+ val single = rxSingle(currentDispatcher()) {
+ expect(4)
+ throw RuntimeException("OK")
+ }
+ expect(2)
+ single.subscribe({
+ expectUnreached()
+ }, { error ->
+ expect(5)
+ assertTrue(error is RuntimeException)
+ assertEquals("OK", error.message)
+ })
+ expect(3)
+ yield() // to started coroutine
+ finish(6)
+ }
+
+
+ @Test
+ fun testBasicUnsubscribe() = runBlocking {
+ expect(1)
+ val single = rxSingle(currentDispatcher()) {
+ expect(4)
+ yield() // back to main, will get cancelled
+ expectUnreached()
+
+ }
+ expect(2)
+ // nothing is called on a disposed rx3 single
+ val sub = single.subscribe({
+ expectUnreached()
+ }, {
+ expectUnreached()
+ })
+ expect(3)
+ yield() // to started coroutine
+ expect(5)
+ sub.dispose() // will cancel coroutine
+ yield()
+ finish(6)
+ }
+
+ @Test
+ fun testSingleNoWait() {
+ val single = rxSingle {
+ "OK"
+ }
+
+ checkSingleValue(single) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testSingleAwait() = runBlocking {
+ assertEquals("OK", Single.just("O").await() + "K")
+ }
+
+ @Test
+ fun testSingleEmitAndAwait() {
+ val single = rxSingle {
+ Single.just("O").await() + "K"
+ }
+
+ checkSingleValue(single) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testSingleWithDelay() {
+ val single = rxSingle {
+ Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K"
+ }
+
+ checkSingleValue(single) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testSingleException() {
+ val single = rxSingle {
+ Observable.just("O", "K").awaitSingle() + "K"
+ }
+
+ checkErroneous(single) {
+ assert(it is IllegalArgumentException)
+ }
+ }
+
+ @Test
+ fun testAwaitFirst() {
+ val single = rxSingle {
+ Observable.just("O", "#").awaitFirst() + "K"
+ }
+
+ checkSingleValue(single) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testAwaitLast() {
+ val single = rxSingle {
+ Observable.just("#", "O").awaitLast() + "K"
+ }
+
+ checkSingleValue(single) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testExceptionFromObservable() {
+ val single = rxSingle {
+ try {
+ Observable.error<String>(RuntimeException("O")).awaitFirst()
+ } catch (e: RuntimeException) {
+ Observable.just(e.message!!).awaitLast() + "K"
+ }
+ }
+
+ checkSingleValue(single) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testExceptionFromCoroutine() {
+ val single = rxSingle<String> {
+ throw IllegalStateException(Observable.just("O").awaitSingle() + "K")
+ }
+
+ checkErroneous(single) {
+ assert(it is IllegalStateException)
+ assertEquals("OK", it.message)
+ }
+ }
+
+ @Test
+ fun testSuppressedException() = runTest {
+ val single = rxSingle(currentDispatcher()) {
+ launch(start = CoroutineStart.ATOMIC) {
+ throw TestException() // child coroutine fails
+ }
+ try {
+ delay(Long.MAX_VALUE)
+ } finally {
+ throw TestException2() // but parent throws another exception while cleaning up
+ }
+ }
+ try {
+ single.await()
+ expectUnreached()
+ } catch (e: TestException) {
+ assertTrue(e.suppressed[0] is TestException2)
+ }
+ }
+
+ @Test
+ fun testFatalExceptionInSubscribe() = runTest {
+ val handler = { e: Throwable ->
+ assertTrue(e is UndeliverableException && e.cause is LinkageError)
+ expect(2)
+ }
+ withExceptionHandler(handler) {
+ rxSingle(Dispatchers.Unconfined) {
+ expect(1)
+ 42
+ }.subscribe(Consumer {
+ throw LinkageError()
+ })
+ finish(3)
+ }
+ }
+
+ @Test
+ fun testFatalExceptionInSingle() = runTest {
+ rxSingle(Dispatchers.Unconfined) {
+ throw LinkageError()
+ }.subscribe({ _, e -> assertTrue(e is LinkageError); expect(1) })
+
+ finish(2)
+ }
+
+ @Test
+ fun testUnhandledException() = runTest {
+ expect(1)
+ var disposable: Disposable? = null
+ val handler = { e: Throwable ->
+ assertTrue(e is UndeliverableException && e.cause is TestException)
+ expect(5)
+ }
+ val single = rxSingle(currentDispatcher()) {
+ expect(4)
+ disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled
+ try {
+ delay(Long.MAX_VALUE)
+ } finally {
+ throw TestException() // would not be able to handle it since mono is disposed
+ }
+ }
+ withExceptionHandler(handler) {
+ single.subscribe(object : SingleObserver<Unit> {
+ override fun onSubscribe(d: Disposable) {
+ expect(2)
+ disposable = d
+ }
+
+ override fun onSuccess(t: Unit) {
+ expectUnreached()
+ }
+
+ override fun onError(t: Throwable) {
+ expectUnreached()
+ }
+ })
+ expect(3)
+ yield() // run coroutine
+ finish(6)
+ }
+ }
+}
diff --git a/settings.gradle b/settings.gradle
index 95fcd7c..759628f 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -34,6 +34,7 @@
module('reactive/kotlinx-coroutines-reactor')
module('reactive/kotlinx-coroutines-jdk9')
module('reactive/kotlinx-coroutines-rx2')
+module('reactive/kotlinx-coroutines-rx3')
module('ui/kotlinx-coroutines-android')
module('ui/kotlinx-coroutines-android/android-unit-tests')
module('ui/kotlinx-coroutines-javafx')