Flow.asObservable and Flow.asFlowable converters in rx2 module
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-rx2.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-rx2.txt
index 791690c..3a99ada 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-rx2.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-rx2.txt
@@ -28,6 +28,8 @@
 	public static final fun asMaybe (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Maybe;
 	public static final fun asObservable (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
 	public static final fun asSingle (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Single;
+	public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Flowable;
+	public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Observable;
 }
 
 public final class kotlinx/coroutines/rx2/RxFlowableKt {
diff --git a/gradle.properties b/gradle.properties
index 11cc74e..84d0494 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -12,6 +12,7 @@
 bintray_version=1.8.4-jetbrains-5
 byte_buddy_version=1.9.3
 reactor_vesion=3.2.5.RELEASE
+reactive_streams_version=1.0.2
 artifactory_plugin_version=4.7.3
 
 # JS
diff --git a/reactive/kotlinx-coroutines-reactive/build.gradle b/reactive/kotlinx-coroutines-reactive/build.gradle
index 5c32170..d5139a1 100644
--- a/reactive/kotlinx-coroutines-reactive/build.gradle
+++ b/reactive/kotlinx-coroutines-reactive/build.gradle
@@ -2,8 +2,6 @@
  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
  */
 
-ext.reactive_streams_version = '1.0.2'
-
 dependencies {
     compile "org.reactivestreams:reactive-streams:$reactive_streams_version"
     testCompile "org.reactivestreams:reactive-streams-tck:$reactive_streams_version"
diff --git a/reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt b/reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt
index 1b7d740..246e5b9 100644
--- a/reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt
@@ -11,7 +11,7 @@
 import kotlin.coroutines.*
 
 /**
- * Transforms the given flow to a spec-compliant [Publisher]
+ * Transforms the given flow to a spec-compliant [Publisher].
  */
 @JvmName("from")
 @FlowPreview
@@ -19,7 +19,7 @@
 
 /**
  * Adapter that transforms [Flow] into TCK-complaint [Publisher].
- * Any calls to [cancel] cancel the original flow.
+ * [cancel] invocation cancels the original flow.
  */
 @Suppress("PublisherImplementation")
 private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T> {
diff --git a/reactive/kotlinx-coroutines-rx2/build.gradle b/reactive/kotlinx-coroutines-rx2/build.gradle
index bf623a2..f87c89d 100644
--- a/reactive/kotlinx-coroutines-rx2/build.gradle
+++ b/reactive/kotlinx-coroutines-rx2/build.gradle
@@ -4,6 +4,8 @@
 
 dependencies {
     compile project(':kotlinx-coroutines-reactive')
+    testCompile project(':kotlinx-coroutines-reactive').sourceSets.test.output
+    testCompile "org.reactivestreams:reactive-streams-tck:$reactive_streams_version"
     compile 'io.reactivex.rxjava2:rxjava:2.1.9'
 }
 
@@ -11,4 +13,23 @@
     externalDocumentationLink {
         url = new URL('http://reactivex.io/RxJava/javadoc/')
     }
-}
\ No newline at end of file
+}
+
+task testNG(type: Test) {
+    useTestNG()
+    reports.html.destination = file("$buildDir/reports/testng")
+    include '**/*ReactiveStreamTckTest.*'
+    // Skip testNG when tests are filtered with --tests, otherwise it simply fails
+    onlyIf {
+        filter.includePatterns.isEmpty()
+    }
+    doFirst {
+        // Classic gradle, nothing works without doFirst
+        println "TestNG tests: ($includes)"
+    }
+}
+
+test {
+    dependsOn(testNG)
+    reports.html.destination = file("$buildDir/reports/junit")
+}
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
index e206ccc..78b35c3 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
@@ -7,6 +7,9 @@
 import io.reactivex.*
 import kotlinx.coroutines.*
 import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.reactive.flow.*
+import org.reactivestreams.*
 import kotlin.coroutines.*
 
 /**
@@ -76,3 +79,40 @@
     for (t in this@asObservable)
         send(t)
 }
+
+/**
+ * Converts the given flow to a cold observable.
+ * The original flow is cancelled if the observable subscriber was disposed.
+ */
+@FlowPreview
+@JvmName("from")
+public fun <T: Any> Flow<T>.asObservable() : Observable<T> = Observable.create { emitter ->
+    /*
+     * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
+     * asObservable is already invoked from unconfined
+     */
+    val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.ATOMIC) {
+        try {
+            collect { value -> emitter.onNext(value) }
+            emitter.onComplete()
+        } catch (e: Throwable) {
+            // 'create' provides safe emitter, so we can unconditionally call on* here if exception occurs in `onComplete`
+            if (e !is CancellationException) emitter.onError(e)
+            else emitter.onComplete()
+
+        }
+    }
+    emitter.setCancellable(RxCancellable(job))
+}
+
+/**
+ * Converts the given flow to a cold observable.
+ * The original flow is cancelled if the flowable subscriber was disposed.
+ */
+@FlowPreview
+@JvmName("from")
+public fun <T: Any> Flow<T>.asFlowable(): Flowable<T> = FlowAsFlowable(asPublisher())
+
+private class FlowAsFlowable<T: Any>(private val publisher: Publisher<T>) : Flowable<T>() {
+    override fun subscribeActual(s: Subscriber<in T>?) = publisher.subscribe(s)
+}
diff --git a/reactive/kotlinx-coroutines-rx2/test/FlowAsObservableTest.kt b/reactive/kotlinx-coroutines-rx2/test/FlowAsObservableTest.kt
new file mode 100644
index 0000000..65d9d67
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/test/FlowAsObservableTest.kt
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx2
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import org.hamcrest.core.*
+import org.junit.*
+import org.junit.Assert.*
+
+class FlowAsObservableTest : TestBase() {
+    @Test
+    fun testBasicSuccess() = runTest {
+        expect(1)
+        val observable = flow {
+            expect(3)
+            emit("OK")
+        }.asObservable()
+
+        expect(2)
+        observable.subscribe { value ->
+            expect(4)
+            assertEquals("OK", value)
+        }
+
+        finish(5)
+    }
+
+    @Test
+    fun testBasicFailure() = runTest {
+        expect(1)
+        val observable = flow<Int> {
+            expect(3)
+            throw RuntimeException("OK")
+        }.asObservable()
+
+        expect(2)
+        observable.subscribe({ expectUnreached() }, { error ->
+            expect(4)
+            assertThat(error, IsInstanceOf(RuntimeException::class.java))
+            assertEquals("OK", error.message)
+        })
+        finish(5)
+    }
+
+    @Test
+    fun testBasicUnsubscribe() = runTest {
+        expect(1)
+        val observable = flow<Int> {
+            expect(3)
+            hang {
+                expect(4)
+            }
+        }.asObservable()
+
+        expect(2)
+        val sub = observable.subscribe({ expectUnreached() }, { expectUnreached() })
+        sub.dispose() // will cancel coroutine
+        finish(5)
+    }
+
+    @Test
+    fun testNotifyOnceOnCancellation() = runTest {
+        val observable =
+            flow {
+                expect(3)
+                emit("OK")
+                hang {
+                    expect(7)
+                }
+            }.asObservable()
+                .doOnNext {
+                    expect(4)
+                    assertEquals("OK", it)
+                }
+                .doOnDispose {
+                    expect(6) // notified once!
+                }
+
+        expect(1)
+        val job = launch(start = CoroutineStart.UNDISPATCHED) {
+            expect(2)
+            observable.consumeEach {
+                expect(5)
+                assertEquals("OK", it)
+            }
+        }
+
+        yield()
+        job.cancelAndJoin()
+        finish(8)
+    }
+
+    @Test
+    fun testFailingConsumer() = runTest {
+        expect(1)
+        val observable = flow {
+            expect(2)
+            emit("OK")
+            hang {
+                expect(4)
+            }
+
+        }.asObservable()
+
+        try {
+            observable.consumeEach {
+                expect(3)
+                throw TestException()
+            }
+        } catch (e: TestException) {
+            finish(5)
+        }
+    }
+
+    @Test
+    fun testNonAtomicStart() = runTest {
+        withContext(Dispatchers.Unconfined) {
+            val observable = flow<Int> {
+                expect(1)
+            }.asObservable()
+
+            val disposable = observable.subscribe({ expectUnreached() }, { expectUnreached() }, { expectUnreached() })
+            disposable.dispose()
+        }
+        finish(2)
+    }
+
+    @Test
+    fun testFlowCancelledFromWithin() = runTest {
+        val observable = flow {
+            expect(1)
+            emit(1)
+            kotlin.coroutines.coroutineContext.cancel()
+            kotlin.coroutines.coroutineContext.ensureActive()
+            expectUnreached()
+        }.asObservable()
+
+        observable.subscribe({ expect(2) }, { expectUnreached() }, { finish(3) })
+    }
+}
diff --git a/reactive/kotlinx-coroutines-rx2/test/IterableFlowAsFlowableTckTest.kt b/reactive/kotlinx-coroutines-rx2/test/IterableFlowAsFlowableTckTest.kt
new file mode 100644
index 0000000..cc22e33
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/test/IterableFlowAsFlowableTckTest.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx2
+
+import io.reactivex.*
+import kotlinx.coroutines.flow.*
+import org.junit.*
+import org.reactivestreams.*
+import org.reactivestreams.tck.*
+
+class IterableFlowAsFlowableTckTest : PublisherVerification<Long>(TestEnvironment()) {
+
+    private fun generate(num: Long): Array<Long> {
+        return Array(if (num >= Integer.MAX_VALUE) 1000000 else num.toInt()) { it.toLong() }
+    }
+
+    override fun createPublisher(elements: Long): Flowable<Long> {
+        return generate(elements).asIterable().asFlow().asFlowable()
+    }
+
+    override fun createFailedPublisher(): Publisher<Long>? = null
+
+    @Ignore
+    override fun required_spec309_requestZeroMustSignalIllegalArgumentException() {
+    }
+
+    @Ignore
+    override fun required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() {
+    }
+
+    @Ignore
+    override fun required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() {
+        //
+    }
+}