Support context in Flow.asPublisher and similar methods (#2156)


Fixes #2155

Co-authored-by: Vsevolod Tolstopyatov <qwwdfsad@gmail.com>
diff --git a/reactive/kotlinx-coroutines-jdk9/api/kotlinx-coroutines-jdk9.api b/reactive/kotlinx-coroutines-jdk9/api/kotlinx-coroutines-jdk9.api
index d4bc169..1f5bdec 100644
--- a/reactive/kotlinx-coroutines-jdk9/api/kotlinx-coroutines-jdk9.api
+++ b/reactive/kotlinx-coroutines-jdk9/api/kotlinx-coroutines-jdk9.api
@@ -15,6 +15,8 @@
 public final class kotlinx/coroutines/jdk9/ReactiveFlowKt {
 	public static final fun asFlow (Ljava/util/concurrent/Flow$Publisher;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;)Ljava/util/concurrent/Flow$Publisher;
+	public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Ljava/util/concurrent/Flow$Publisher;
+	public static synthetic fun asPublisher$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Ljava/util/concurrent/Flow$Publisher;
 	public static final fun collect (Ljava/util/concurrent/Flow$Publisher;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 }
 
diff --git a/reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt
index 89caf82..5d546df 100644
--- a/reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt
+++ b/reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt
@@ -4,12 +4,14 @@
 
 package kotlinx.coroutines.jdk9
 
+import kotlinx.coroutines.*
 import kotlinx.coroutines.flow.*
 import kotlinx.coroutines.reactive.asFlow
 import kotlinx.coroutines.reactive.asPublisher
 import kotlinx.coroutines.reactive.collect
+import org.reactivestreams.*
+import kotlin.coroutines.*
 import java.util.concurrent.Flow as JFlow
-import org.reactivestreams.FlowAdapters
 
 /**
  * Transforms the given reactive [Publisher] into [Flow].
@@ -25,9 +27,15 @@
 
 /**
  * Transforms the given flow to a reactive specification compliant [Publisher].
+ *
+ * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
+ * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
+ * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
+ * is used, so calls are performed from an arbitrary thread.
  */
-public fun <T : Any> Flow<T>.asPublisher(): JFlow.Publisher<T> {
-    val reactivePublisher : org.reactivestreams.Publisher<T> = this.asPublisher<T>()
+@JvmOverloads // binary compatibility
+public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): JFlow.Publisher<T> {
+    val reactivePublisher : org.reactivestreams.Publisher<T> = this.asPublisher<T>(context)
     return FlowAdapters.toFlowPublisher(reactivePublisher)
 }
 
diff --git a/reactive/kotlinx-coroutines-reactive/api/kotlinx-coroutines-reactive.api b/reactive/kotlinx-coroutines-reactive/api/kotlinx-coroutines-reactive.api
index bed065d..5783ede 100644
--- a/reactive/kotlinx-coroutines-reactive/api/kotlinx-coroutines-reactive.api
+++ b/reactive/kotlinx-coroutines-reactive/api/kotlinx-coroutines-reactive.api
@@ -32,7 +32,7 @@
 public final class kotlinx/coroutines/reactive/FlowSubscription : kotlinx/coroutines/AbstractCoroutine, org/reactivestreams/Subscription {
 	public final field flow Lkotlinx/coroutines/flow/Flow;
 	public final field subscriber Lorg/reactivestreams/Subscriber;
-	public fun <init> (Lkotlinx/coroutines/flow/Flow;Lorg/reactivestreams/Subscriber;)V
+	public fun <init> (Lkotlinx/coroutines/flow/Flow;Lorg/reactivestreams/Subscriber;Lkotlin/coroutines/CoroutineContext;)V
 	public fun cancel ()V
 	public fun request (J)V
 }
@@ -65,5 +65,7 @@
 public final class kotlinx/coroutines/reactive/ReactiveFlowKt {
 	public static final fun asFlow (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;)Lorg/reactivestreams/Publisher;
+	public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lorg/reactivestreams/Publisher;
+	public static synthetic fun asPublisher$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lorg/reactivestreams/Publisher;
 }
 
diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
index efa9c9c..a51f583 100644
--- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
@@ -34,8 +34,15 @@
  *
  * This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module,
  * see its documentation for additional details.
+ *
+ * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
+ * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
+ * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
+ * is used, so calls are performed from an arbitrary thread.
  */
-public fun <T : Any> Flow<T>.asPublisher(): Publisher<T> = FlowAsPublisher(this)
+@JvmOverloads // binary compatibility
+public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T> =
+    FlowAsPublisher(this, Dispatchers.Unconfined + context)
 
 private class PublisherAsFlow<T : Any>(
     private val publisher: Publisher<T>,
@@ -153,11 +160,14 @@
  * Adapter that transforms [Flow] into TCK-complaint [Publisher].
  * [cancel] invocation cancels the original flow.
  */
-@Suppress("PublisherImplementation")
-private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T> {
+@Suppress("ReactiveStreamsPublisherImplementation")
+private class FlowAsPublisher<T : Any>(
+    private val flow: Flow<T>,
+    private val context: CoroutineContext
+) : Publisher<T> {
     override fun subscribe(subscriber: Subscriber<in T>?) {
         if (subscriber == null) throw NullPointerException()
-        subscriber.onSubscribe(FlowSubscription(flow, subscriber))
+        subscriber.onSubscribe(FlowSubscription(flow, subscriber, context))
     }
 }
 
@@ -165,8 +175,9 @@
 @InternalCoroutinesApi
 public class FlowSubscription<T>(
     @JvmField public val flow: Flow<T>,
-    @JvmField public val subscriber: Subscriber<in T>
-) : Subscription, AbstractCoroutine<Unit>(Dispatchers.Unconfined, true) {
+    @JvmField public val subscriber: Subscriber<in T>,
+    context: CoroutineContext
+) : Subscription, AbstractCoroutine<Unit>(context, true) {
     private val requested = atomic(0L)
     private val producer = atomic<Continuation<Unit>?>(createInitialContinuation())
 
diff --git a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt
index c044d92..e7b8cb1 100644
--- a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt
@@ -8,10 +8,10 @@
 import kotlinx.coroutines.flow.*
 import org.junit.Test
 import org.reactivestreams.*
+import java.util.concurrent.*
 import kotlin.test.*
 
 class FlowAsPublisherTest : TestBase() {
-
     @Test
     fun testErrorOnCancellationIsReported() {
         expect(1)
@@ -75,4 +75,78 @@
         })
         finish(4)
     }
+
+    @Test
+    fun testUnconfinedDefaultContext() {
+        expect(1)
+        val thread = Thread.currentThread()
+        fun checkThread() {
+            assertSame(thread, Thread.currentThread())
+        }
+        flowOf(42).asPublisher().subscribe(object : Subscriber<Int> {
+            private lateinit var subscription: Subscription
+
+            override fun onSubscribe(s: Subscription) {
+                expect(2)
+                subscription = s
+                subscription.request(2)
+            }
+
+            override fun onNext(t: Int) {
+                checkThread()
+                expect(3)
+                assertEquals(42, t)
+            }
+
+            override fun onComplete() {
+                checkThread()
+                expect(4)
+            }
+
+            override fun onError(t: Throwable?) {
+                expectUnreached()
+            }
+        })
+        finish(5)
+    }
+
+    @Test
+    fun testConfinedContext() {
+        expect(1)
+        val threadName = "FlowAsPublisherTest.testConfinedContext"
+        fun checkThread() {
+            val currentThread = Thread.currentThread()
+            assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread")
+        }
+        val completed = CountDownLatch(1)
+        newSingleThreadContext(threadName).use { dispatcher ->
+            flowOf(42).asPublisher(dispatcher).subscribe(object : Subscriber<Int> {
+                private lateinit var subscription: Subscription
+
+                override fun onSubscribe(s: Subscription) {
+                    expect(2)
+                    subscription = s
+                    subscription.request(2)
+                }
+
+                override fun onNext(t: Int) {
+                    checkThread()
+                    expect(3)
+                    assertEquals(42, t)
+                }
+
+                override fun onComplete() {
+                    checkThread()
+                    expect(4)
+                    completed.countDown()
+                }
+
+                override fun onError(t: Throwable?) {
+                    expectUnreached()
+                }
+            })
+            completed.await()
+        }
+        finish(5)
+    }
 }
diff --git a/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api b/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api
index 422f36b..3b5c6b9 100644
--- a/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api
+++ b/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api
@@ -38,6 +38,8 @@
 
 public final class kotlinx/coroutines/reactor/ReactorFlowKt {
 	public static final fun asFlux (Lkotlinx/coroutines/flow/Flow;)Lreactor/core/publisher/Flux;
+	public static final fun asFlux (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lreactor/core/publisher/Flux;
+	public static synthetic fun asFlux$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lreactor/core/publisher/Flux;
 }
 
 public final class kotlinx/coroutines/reactor/SchedulerCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher, kotlinx/coroutines/Delay {
diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt
index d665c88..a478ab1 100644
--- a/reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt
+++ b/reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt
@@ -4,25 +4,38 @@
 
 package kotlinx.coroutines.reactor
 
+import kotlinx.coroutines.*
 import kotlinx.coroutines.flow.Flow
 import kotlinx.coroutines.flow.flowOn
 import kotlinx.coroutines.reactive.FlowSubscription
+import org.reactivestreams.*
 import reactor.core.CoreSubscriber
 import reactor.core.publisher.Flux
+import kotlin.coroutines.*
 
 /**
  * Converts the given flow to a cold flux.
  * The original flow is cancelled when the flux subscriber is disposed.
  *
  * This function is integrated with [ReactorContext], see its documentation for additional details.
+ *
+ * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
+ * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
+ * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
+ * is used, so calls are performed from an arbitrary thread.
  */
-public fun <T: Any> Flow<T>.asFlux(): Flux<T> = FlowAsFlux(this)
+@JvmOverloads // binary compatibility
+public fun <T: Any> Flow<T>.asFlux(context: CoroutineContext = EmptyCoroutineContext): Flux<T> =
+    FlowAsFlux(this, Dispatchers.Unconfined + context)
 
-private class FlowAsFlux<T : Any>(private val flow: Flow<T>) : Flux<T>() {
+private class FlowAsFlux<T : Any>(
+    private val flow: Flow<T>,
+    private val context: CoroutineContext
+) : Flux<T>() {
     override fun subscribe(subscriber: CoreSubscriber<in T>?) {
         if (subscriber == null) throw NullPointerException()
         val hasContext = !subscriber.currentContext().isEmpty
         val source = if (hasContext) flow.flowOn(subscriber.currentContext().asCoroutineContext()) else flow
-        subscriber.onSubscribe(FlowSubscription(source, subscriber))
+        subscriber.onSubscribe(FlowSubscription(source, subscriber, context))
     }
 }
diff --git a/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt
index e4bd8b3..cecc895 100644
--- a/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt
+++ b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt
@@ -4,10 +4,13 @@
 import kotlinx.coroutines.flow.*
 import kotlinx.coroutines.reactive.*
 import org.junit.Test
+import org.reactivestreams.*
 import reactor.core.publisher.*
 import reactor.util.context.Context
+import java.util.concurrent.*
 import kotlin.test.*
 
+@Suppress("ReactiveStreamsSubscriberImplementation")
 class FlowAsFluxTest : TestBase() {
     @Test
     fun testFlowAsFluxContextPropagation() {
@@ -68,4 +71,78 @@
         }
         finish(4)
     }
-}
\ No newline at end of file
+
+    @Test
+    fun testUnconfinedDefaultContext() {
+        expect(1)
+        val thread = Thread.currentThread()
+        fun checkThread() {
+            assertSame(thread, Thread.currentThread())
+        }
+        flowOf(42).asFlux().subscribe(object : Subscriber<Int> {
+            private lateinit var subscription: Subscription
+
+            override fun onSubscribe(s: Subscription) {
+                expect(2)
+                subscription = s
+                subscription.request(2)
+            }
+
+            override fun onNext(t: Int) {
+                checkThread()
+                expect(3)
+                assertEquals(42, t)
+            }
+
+            override fun onComplete() {
+                checkThread()
+                expect(4)
+            }
+
+            override fun onError(t: Throwable?) {
+                expectUnreached()
+            }
+        })
+        finish(5)
+    }
+
+    @Test
+    fun testConfinedContext() {
+        expect(1)
+        val threadName = "FlowAsFluxTest.testConfinedContext"
+        fun checkThread() {
+            val currentThread = Thread.currentThread()
+            assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread")
+        }
+        val completed = CountDownLatch(1)
+        newSingleThreadContext(threadName).use { dispatcher ->
+            flowOf(42).asFlux(dispatcher).subscribe(object : Subscriber<Int> {
+                private lateinit var subscription: Subscription
+
+                override fun onSubscribe(s: Subscription) {
+                    expect(2)
+                    subscription = s
+                    subscription.request(2)
+                }
+
+                override fun onNext(t: Int) {
+                    checkThread()
+                    expect(3)
+                    assertEquals(42, t)
+                }
+
+                override fun onComplete() {
+                    checkThread()
+                    expect(4)
+                    completed.countDown()
+                }
+
+                override fun onError(t: Throwable?) {
+                    expectUnreached()
+                }
+            })
+            completed.await()
+        }
+        finish(5)
+    }
+}
diff --git a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api
index 22f4038..06ddb68 100644
--- a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api
+++ b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api
@@ -35,6 +35,10 @@
 	public static final fun asSingle (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Single;
 	public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Flowable;
 	public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Observable;
+	public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Flowable;
+	public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
+	public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Flowable;
+	public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Observable;
 }
 
 public final class kotlinx/coroutines/rx2/RxFlowableKt {
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
index 0be606f..264cdad 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
@@ -10,6 +10,7 @@
 import kotlinx.coroutines.channels.*
 import kotlinx.coroutines.flow.*
 import kotlinx.coroutines.reactive.*
+import org.reactivestreams.*
 import java.util.concurrent.atomic.*
 import kotlin.coroutines.*
 
@@ -106,15 +107,21 @@
 /**
  * Converts the given flow to a cold observable.
  * The original flow is cancelled when the observable subscriber is disposed.
+ *
+ * An optional [context] can be specified to control the execution context of calls to [Observer] methods.
+ * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
+ * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
+ * is used, so calls are performed from an arbitrary thread.
  */
+@JvmOverloads // binary compatibility
 @JvmName("from")
 @ExperimentalCoroutinesApi
-public fun <T: Any> Flow<T>.asObservable() : Observable<T> = Observable.create { emitter ->
+public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
     /*
      * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
      * asObservable is already invoked from unconfined
      */
-    val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.ATOMIC) {
+    val job = GlobalScope.launch(Dispatchers.Unconfined + context, start = CoroutineStart.ATOMIC) {
         try {
             collect { value -> emitter.onNext(value) }
             emitter.onComplete()
@@ -135,7 +142,14 @@
 /**
  * Converts the given flow to a cold flowable.
  * The original flow is cancelled when the flowable subscriber is disposed.
+ *
+ * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
+ * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
+ * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
+ * is used, so calls are performed from an arbitrary thread.
  */
+@JvmOverloads // binary compatibility
 @JvmName("from")
 @ExperimentalCoroutinesApi
-public fun <T: Any> Flow<T>.asFlowable(): Flowable<T> = Flowable.fromPublisher(asPublisher())
+public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
+    Flowable.fromPublisher(asPublisher(context))
diff --git a/reactive/kotlinx-coroutines-rx2/test/FlowAsFlowableTest.kt b/reactive/kotlinx-coroutines-rx2/test/FlowAsFlowableTest.kt
new file mode 100644
index 0000000..1cbded6
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/test/FlowAsFlowableTest.kt
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx2
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import org.junit.Test
+import org.reactivestreams.*
+import java.util.concurrent.*
+import kotlin.test.*
+
+@Suppress("ReactiveStreamsSubscriberImplementation")
+class FlowAsFlowableTest : TestBase() {
+    @Test
+    fun testUnconfinedDefaultContext() {
+        expect(1)
+        val thread = Thread.currentThread()
+        fun checkThread() {
+            assertSame(thread, Thread.currentThread())
+        }
+        flowOf(42).asFlowable().subscribe(object : Subscriber<Int> {
+            private lateinit var subscription: Subscription
+
+            override fun onSubscribe(s: Subscription) {
+                expect(2)
+                subscription = s
+                subscription.request(2)
+            }
+
+            override fun onNext(t: Int) {
+                checkThread()
+                expect(3)
+                assertEquals(42, t)
+            }
+
+            override fun onComplete() {
+                checkThread()
+                expect(4)
+            }
+
+            override fun onError(t: Throwable?) {
+                expectUnreached()
+            }
+        })
+        finish(5)
+    }
+
+    @Test
+    fun testConfinedContext() {
+        expect(1)
+        val threadName = "FlowAsFlowableTest.testConfinedContext"
+        fun checkThread() {
+            val currentThread = Thread.currentThread()
+            assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread")
+        }
+        val completed = CountDownLatch(1)
+        newSingleThreadContext(threadName).use { dispatcher ->
+            flowOf(42).asFlowable(dispatcher).subscribe(object : Subscriber<Int> {
+                private lateinit var subscription: Subscription
+
+                override fun onSubscribe(s: Subscription) {
+                    expect(2)
+                    subscription = s
+                    subscription.request(2)
+                }
+
+                override fun onNext(t: Int) {
+                    checkThread()
+                    expect(3)
+                    assertEquals(42, t)
+                }
+
+                override fun onComplete() {
+                    checkThread()
+                    expect(4)
+                    completed.countDown()
+                }
+
+                override fun onError(t: Throwable?) {
+                    expectUnreached()
+                }
+            })
+            completed.await()
+        }
+        finish(5)
+    }
+}
diff --git a/reactive/kotlinx-coroutines-rx2/test/FlowAsObservableTest.kt b/reactive/kotlinx-coroutines-rx2/test/FlowAsObservableTest.kt
index 0908b34..3cde182 100644
--- a/reactive/kotlinx-coroutines-rx2/test/FlowAsObservableTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/FlowAsObservableTest.kt
@@ -4,9 +4,12 @@
 
 package kotlinx.coroutines.rx2
 
+import io.reactivex.*
+import io.reactivex.disposables.*
 import kotlinx.coroutines.*
 import kotlinx.coroutines.flow.*
 import org.junit.Test
+import java.util.concurrent.*
 import kotlin.test.*
 
 class FlowAsObservableTest : TestBase() {
@@ -139,4 +142,70 @@
 
         observable.subscribe({ expect(2) }, { expectUnreached() }, { finish(3) })
     }
+
+    @Test
+    fun testUnconfinedDefaultContext() {
+        expect(1)
+        val thread = Thread.currentThread()
+        fun checkThread() {
+            assertSame(thread, Thread.currentThread())
+        }
+        flowOf(42).asObservable().subscribe(object : Observer<Int> {
+            override fun onSubscribe(d: Disposable) {
+                expect(2)
+            }
+
+            override fun onNext(t: Int) {
+                checkThread()
+                expect(3)
+                assertEquals(42, t)
+            }
+
+            override fun onComplete() {
+                checkThread()
+                expect(4)
+            }
+
+            override fun onError(t: Throwable) {
+                expectUnreached()
+            }
+        })
+        finish(5)
+    }
+
+    @Test
+    fun testConfinedContext() {
+        expect(1)
+        val threadName = "FlowAsObservableTest.testConfinedContext"
+        fun checkThread() {
+            val currentThread = Thread.currentThread()
+            assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread")
+        }
+        val completed = CountDownLatch(1)
+        newSingleThreadContext(threadName).use { dispatcher ->
+            flowOf(42).asObservable(dispatcher).subscribe(object : Observer<Int> {
+                override fun onSubscribe(d: Disposable) {
+                    expect(2)
+                }
+
+                override fun onNext(t: Int) {
+                    checkThread()
+                    expect(3)
+                    assertEquals(42, t)
+                }
+
+                override fun onComplete() {
+                    checkThread()
+                    expect(4)
+                    completed.countDown()
+                }
+
+                override fun onError(e: Throwable) {
+                    expectUnreached()
+                }
+            })
+            completed.await()
+        }
+        finish(5)
+    }
 }
diff --git a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api
index 27c3d3d..4f15eda 100644
--- a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api
+++ b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api
@@ -30,6 +30,10 @@
 	public static final fun asSingle (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Single;
 	public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Flowable;
 	public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Observable;
+	public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Flowable;
+	public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Observable;
+	public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Flowable;
+	public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Observable;
 }
 
 public final class kotlinx/coroutines/rx3/RxFlowableKt {
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
index f9e2e21..c7ab237 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
@@ -10,6 +10,7 @@
 import kotlinx.coroutines.channels.*
 import kotlinx.coroutines.flow.*
 import kotlinx.coroutines.reactive.*
+import org.reactivestreams.*
 import java.util.concurrent.atomic.*
 import kotlin.coroutines.*
 
@@ -91,15 +92,21 @@
 /**
  * Converts the given flow to a cold observable.
  * The original flow is cancelled when the observable subscriber is disposed.
+ *
+ * An optional [context] can be specified to control the execution context of calls to [Observer] methods.
+ * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
+ * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
+ * is used, so calls are performed from an arbitrary thread.
  */
+@JvmOverloads // binary compatibility
 @JvmName("from")
 @ExperimentalCoroutinesApi
-public fun <T: Any> Flow<T>.asObservable() : Observable<T> = Observable.create { emitter ->
+public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
     /*
      * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
      * asObservable is already invoked from unconfined
      */
-    val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.ATOMIC) {
+    val job = GlobalScope.launch(Dispatchers.Unconfined + context, start = CoroutineStart.ATOMIC) {
         try {
             collect { value -> emitter.onNext(value) }
             emitter.onComplete()
@@ -120,7 +127,14 @@
 /**
  * Converts the given flow to a cold flowable.
  * The original flow is cancelled when the flowable subscriber is disposed.
+ *
+ * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
+ * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
+ * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
+ * is used, so calls are performed from an arbitrary thread.
  */
+@JvmOverloads // binary compatibility
 @JvmName("from")
 @ExperimentalCoroutinesApi
-public fun <T: Any> Flow<T>.asFlowable(): Flowable<T> = Flowable.fromPublisher(asPublisher())
+public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
+    Flowable.fromPublisher(asPublisher(context))
diff --git a/reactive/kotlinx-coroutines-rx3/test/FlowAsFlowableTest.kt b/reactive/kotlinx-coroutines-rx3/test/FlowAsFlowableTest.kt
new file mode 100644
index 0000000..a73fee4
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/FlowAsFlowableTest.kt
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import org.junit.Test
+import org.reactivestreams.*
+import java.util.concurrent.*
+import kotlin.test.*
+
+@Suppress("ReactiveStreamsSubscriberImplementation")
+class FlowAsFlowableTest : TestBase() {
+    @Test
+    fun testUnconfinedDefaultContext() {
+        expect(1)
+        val thread = Thread.currentThread()
+        fun checkThread() {
+            assertSame(thread, Thread.currentThread())
+        }
+        flowOf(42).asFlowable().subscribe(object : Subscriber<Int> {
+            private lateinit var subscription: Subscription
+
+            override fun onSubscribe(s: Subscription) {
+                expect(2)
+                subscription = s
+                subscription.request(2)
+            }
+
+            override fun onNext(t: Int) {
+                checkThread()
+                expect(3)
+                assertEquals(42, t)
+            }
+
+            override fun onComplete() {
+                checkThread()
+                expect(4)
+            }
+
+            override fun onError(t: Throwable?) {
+                expectUnreached()
+            }
+        })
+        finish(5)
+    }
+
+    @Test
+    fun testConfinedContext() {
+        expect(1)
+        val threadName = "FlowAsFlowableTest.testConfinedContext"
+        fun checkThread() {
+            val currentThread = Thread.currentThread()
+            assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread")
+        }
+        val completed = CountDownLatch(1)
+        newSingleThreadContext(threadName).use { dispatcher ->
+            flowOf(42).asFlowable(dispatcher).subscribe(object : Subscriber<Int> {
+                private lateinit var subscription: Subscription
+
+                override fun onSubscribe(s: Subscription) {
+                    expect(2)
+                    subscription = s
+                    subscription.request(2)
+                }
+
+                override fun onNext(t: Int) {
+                    checkThread()
+                    expect(3)
+                    assertEquals(42, t)
+                }
+
+                override fun onComplete() {
+                    checkThread()
+                    expect(4)
+                    completed.countDown()
+                }
+
+                override fun onError(t: Throwable?) {
+                    expectUnreached()
+                }
+            })
+            completed.await()
+        }
+        finish(5)
+    }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/test/FlowAsObservableTest.kt b/reactive/kotlinx-coroutines-rx3/test/FlowAsObservableTest.kt
index 50c4ae7..5759f9f 100644
--- a/reactive/kotlinx-coroutines-rx3/test/FlowAsObservableTest.kt
+++ b/reactive/kotlinx-coroutines-rx3/test/FlowAsObservableTest.kt
@@ -4,9 +4,12 @@
 
 package kotlinx.coroutines.rx3
 
+import io.reactivex.rxjava3.core.*
+import io.reactivex.rxjava3.disposables.*
 import kotlinx.coroutines.*
 import kotlinx.coroutines.flow.*
 import org.junit.Test
+import java.util.concurrent.*
 import kotlin.test.*
 
 class FlowAsObservableTest : TestBase() {
@@ -139,4 +142,70 @@
 
         observable.subscribe({ expect(2) }, { expectUnreached() }, { finish(3) })
     }
+
+    @Test
+    fun testUnconfinedDefaultContext() {
+        expect(1)
+        val thread = Thread.currentThread()
+        fun checkThread() {
+            assertSame(thread, Thread.currentThread())
+        }
+        flowOf(42).asObservable().subscribe(object : Observer<Int> {
+            override fun onSubscribe(d: Disposable) {
+                expect(2)
+            }
+
+            override fun onNext(t: Int) {
+                checkThread()
+                expect(3)
+                assertEquals(42, t)
+            }
+
+            override fun onComplete() {
+                checkThread()
+                expect(4)
+            }
+
+            override fun onError(t: Throwable) {
+                expectUnreached()
+            }
+        })
+        finish(5)
+    }
+
+    @Test
+    fun testConfinedContext() {
+        expect(1)
+        val threadName = "FlowAsObservableTest.testConfinedContext"
+        fun checkThread() {
+            val currentThread = Thread.currentThread()
+            assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread")
+        }
+        val completed = CountDownLatch(1)
+        newSingleThreadContext(threadName).use { dispatcher ->
+            flowOf(42).asObservable(dispatcher).subscribe(object : Observer<Int> {
+                override fun onSubscribe(d: Disposable) {
+                    expect(2)
+                }
+
+                override fun onNext(t: Int) {
+                    checkThread()
+                    expect(3)
+                    assertEquals(42, t)
+                }
+
+                override fun onComplete() {
+                    checkThread()
+                    expect(4)
+                    completed.countDown()
+                }
+
+                override fun onError(e: Throwable) {
+                    expectUnreached()
+                }
+            })
+            completed.await()
+        }
+        finish(5)
+    }
 }