Flow.asObservable and Flow.asFlowable converters in rx2 module
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-rx2.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-rx2.txt
index 791690c..3a99ada 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-rx2.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-rx2.txt
@@ -28,6 +28,8 @@
public static final fun asMaybe (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Maybe;
public static final fun asObservable (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
public static final fun asSingle (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Single;
+ public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Flowable;
+ public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Observable;
}
public final class kotlinx/coroutines/rx2/RxFlowableKt {
diff --git a/gradle.properties b/gradle.properties
index 11cc74e..84d0494 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -12,6 +12,7 @@
bintray_version=1.8.4-jetbrains-5
byte_buddy_version=1.9.3
reactor_vesion=3.2.5.RELEASE
+reactive_streams_version=1.0.2
artifactory_plugin_version=4.7.3
# JS
diff --git a/reactive/kotlinx-coroutines-reactive/build.gradle b/reactive/kotlinx-coroutines-reactive/build.gradle
index 5c32170..d5139a1 100644
--- a/reactive/kotlinx-coroutines-reactive/build.gradle
+++ b/reactive/kotlinx-coroutines-reactive/build.gradle
@@ -2,8 +2,6 @@
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-ext.reactive_streams_version = '1.0.2'
-
dependencies {
compile "org.reactivestreams:reactive-streams:$reactive_streams_version"
testCompile "org.reactivestreams:reactive-streams-tck:$reactive_streams_version"
diff --git a/reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt b/reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt
index 1b7d740..246e5b9 100644
--- a/reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt
@@ -11,7 +11,7 @@
import kotlin.coroutines.*
/**
- * Transforms the given flow to a spec-compliant [Publisher]
+ * Transforms the given flow to a spec-compliant [Publisher].
*/
@JvmName("from")
@FlowPreview
@@ -19,7 +19,7 @@
/**
* Adapter that transforms [Flow] into TCK-complaint [Publisher].
- * Any calls to [cancel] cancel the original flow.
+ * [cancel] invocation cancels the original flow.
*/
@Suppress("PublisherImplementation")
private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T> {
diff --git a/reactive/kotlinx-coroutines-rx2/build.gradle b/reactive/kotlinx-coroutines-rx2/build.gradle
index bf623a2..f87c89d 100644
--- a/reactive/kotlinx-coroutines-rx2/build.gradle
+++ b/reactive/kotlinx-coroutines-rx2/build.gradle
@@ -4,6 +4,8 @@
dependencies {
compile project(':kotlinx-coroutines-reactive')
+ testCompile project(':kotlinx-coroutines-reactive').sourceSets.test.output
+ testCompile "org.reactivestreams:reactive-streams-tck:$reactive_streams_version"
compile 'io.reactivex.rxjava2:rxjava:2.1.9'
}
@@ -11,4 +13,23 @@
externalDocumentationLink {
url = new URL('http://reactivex.io/RxJava/javadoc/')
}
-}
\ No newline at end of file
+}
+
+task testNG(type: Test) {
+ useTestNG()
+ reports.html.destination = file("$buildDir/reports/testng")
+ include '**/*ReactiveStreamTckTest.*'
+ // Skip testNG when tests are filtered with --tests, otherwise it simply fails
+ onlyIf {
+ filter.includePatterns.isEmpty()
+ }
+ doFirst {
+ // Classic gradle, nothing works without doFirst
+ println "TestNG tests: ($includes)"
+ }
+}
+
+test {
+ dependsOn(testNG)
+ reports.html.destination = file("$buildDir/reports/junit")
+}
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
index e206ccc..78b35c3 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
@@ -7,6 +7,9 @@
import io.reactivex.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.reactive.flow.*
+import org.reactivestreams.*
import kotlin.coroutines.*
/**
@@ -76,3 +79,40 @@
for (t in this@asObservable)
send(t)
}
+
+/**
+ * Converts the given flow to a cold observable.
+ * The original flow is cancelled if the observable subscriber was disposed.
+ */
+@FlowPreview
+@JvmName("from")
+public fun <T: Any> Flow<T>.asObservable() : Observable<T> = Observable.create { emitter ->
+ /*
+ * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
+ * asObservable is already invoked from unconfined
+ */
+ val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.ATOMIC) {
+ try {
+ collect { value -> emitter.onNext(value) }
+ emitter.onComplete()
+ } catch (e: Throwable) {
+ // 'create' provides safe emitter, so we can unconditionally call on* here if exception occurs in `onComplete`
+ if (e !is CancellationException) emitter.onError(e)
+ else emitter.onComplete()
+
+ }
+ }
+ emitter.setCancellable(RxCancellable(job))
+}
+
+/**
+ * Converts the given flow to a cold observable.
+ * The original flow is cancelled if the flowable subscriber was disposed.
+ */
+@FlowPreview
+@JvmName("from")
+public fun <T: Any> Flow<T>.asFlowable(): Flowable<T> = FlowAsFlowable(asPublisher())
+
+private class FlowAsFlowable<T: Any>(private val publisher: Publisher<T>) : Flowable<T>() {
+ override fun subscribeActual(s: Subscriber<in T>?) = publisher.subscribe(s)
+}
diff --git a/reactive/kotlinx-coroutines-rx2/test/FlowAsObservableTest.kt b/reactive/kotlinx-coroutines-rx2/test/FlowAsObservableTest.kt
new file mode 100644
index 0000000..65d9d67
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/test/FlowAsObservableTest.kt
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx2
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import org.hamcrest.core.*
+import org.junit.*
+import org.junit.Assert.*
+
+class FlowAsObservableTest : TestBase() {
+ @Test
+ fun testBasicSuccess() = runTest {
+ expect(1)
+ val observable = flow {
+ expect(3)
+ emit("OK")
+ }.asObservable()
+
+ expect(2)
+ observable.subscribe { value ->
+ expect(4)
+ assertEquals("OK", value)
+ }
+
+ finish(5)
+ }
+
+ @Test
+ fun testBasicFailure() = runTest {
+ expect(1)
+ val observable = flow<Int> {
+ expect(3)
+ throw RuntimeException("OK")
+ }.asObservable()
+
+ expect(2)
+ observable.subscribe({ expectUnreached() }, { error ->
+ expect(4)
+ assertThat(error, IsInstanceOf(RuntimeException::class.java))
+ assertEquals("OK", error.message)
+ })
+ finish(5)
+ }
+
+ @Test
+ fun testBasicUnsubscribe() = runTest {
+ expect(1)
+ val observable = flow<Int> {
+ expect(3)
+ hang {
+ expect(4)
+ }
+ }.asObservable()
+
+ expect(2)
+ val sub = observable.subscribe({ expectUnreached() }, { expectUnreached() })
+ sub.dispose() // will cancel coroutine
+ finish(5)
+ }
+
+ @Test
+ fun testNotifyOnceOnCancellation() = runTest {
+ val observable =
+ flow {
+ expect(3)
+ emit("OK")
+ hang {
+ expect(7)
+ }
+ }.asObservable()
+ .doOnNext {
+ expect(4)
+ assertEquals("OK", it)
+ }
+ .doOnDispose {
+ expect(6) // notified once!
+ }
+
+ expect(1)
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ expect(2)
+ observable.consumeEach {
+ expect(5)
+ assertEquals("OK", it)
+ }
+ }
+
+ yield()
+ job.cancelAndJoin()
+ finish(8)
+ }
+
+ @Test
+ fun testFailingConsumer() = runTest {
+ expect(1)
+ val observable = flow {
+ expect(2)
+ emit("OK")
+ hang {
+ expect(4)
+ }
+
+ }.asObservable()
+
+ try {
+ observable.consumeEach {
+ expect(3)
+ throw TestException()
+ }
+ } catch (e: TestException) {
+ finish(5)
+ }
+ }
+
+ @Test
+ fun testNonAtomicStart() = runTest {
+ withContext(Dispatchers.Unconfined) {
+ val observable = flow<Int> {
+ expect(1)
+ }.asObservable()
+
+ val disposable = observable.subscribe({ expectUnreached() }, { expectUnreached() }, { expectUnreached() })
+ disposable.dispose()
+ }
+ finish(2)
+ }
+
+ @Test
+ fun testFlowCancelledFromWithin() = runTest {
+ val observable = flow {
+ expect(1)
+ emit(1)
+ kotlin.coroutines.coroutineContext.cancel()
+ kotlin.coroutines.coroutineContext.ensureActive()
+ expectUnreached()
+ }.asObservable()
+
+ observable.subscribe({ expect(2) }, { expectUnreached() }, { finish(3) })
+ }
+}
diff --git a/reactive/kotlinx-coroutines-rx2/test/IterableFlowAsFlowableTckTest.kt b/reactive/kotlinx-coroutines-rx2/test/IterableFlowAsFlowableTckTest.kt
new file mode 100644
index 0000000..cc22e33
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/test/IterableFlowAsFlowableTckTest.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx2
+
+import io.reactivex.*
+import kotlinx.coroutines.flow.*
+import org.junit.*
+import org.reactivestreams.*
+import org.reactivestreams.tck.*
+
+class IterableFlowAsFlowableTckTest : PublisherVerification<Long>(TestEnvironment()) {
+
+ private fun generate(num: Long): Array<Long> {
+ return Array(if (num >= Integer.MAX_VALUE) 1000000 else num.toInt()) { it.toLong() }
+ }
+
+ override fun createPublisher(elements: Long): Flowable<Long> {
+ return generate(elements).asIterable().asFlow().asFlowable()
+ }
+
+ override fun createFailedPublisher(): Publisher<Long>? = null
+
+ @Ignore
+ override fun required_spec309_requestZeroMustSignalIllegalArgumentException() {
+ }
+
+ @Ignore
+ override fun required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() {
+ }
+
+ @Ignore
+ override fun required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() {
+ //
+ }
+}