Support context in Flow.asPublisher and similar methods (#2156)


Fixes #2155

Co-authored-by: Vsevolod Tolstopyatov <qwwdfsad@gmail.com>
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
index 0be606f..264cdad 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
@@ -10,6 +10,7 @@
 import kotlinx.coroutines.channels.*
 import kotlinx.coroutines.flow.*
 import kotlinx.coroutines.reactive.*
+import org.reactivestreams.*
 import java.util.concurrent.atomic.*
 import kotlin.coroutines.*
 
@@ -106,15 +107,21 @@
 /**
  * Converts the given flow to a cold observable.
  * The original flow is cancelled when the observable subscriber is disposed.
+ *
+ * An optional [context] can be specified to control the execution context of calls to [Observer] methods.
+ * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
+ * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
+ * is used, so calls are performed from an arbitrary thread.
  */
+@JvmOverloads // binary compatibility
 @JvmName("from")
 @ExperimentalCoroutinesApi
-public fun <T: Any> Flow<T>.asObservable() : Observable<T> = Observable.create { emitter ->
+public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
     /*
      * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
      * asObservable is already invoked from unconfined
      */
-    val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.ATOMIC) {
+    val job = GlobalScope.launch(Dispatchers.Unconfined + context, start = CoroutineStart.ATOMIC) {
         try {
             collect { value -> emitter.onNext(value) }
             emitter.onComplete()
@@ -135,7 +142,14 @@
 /**
  * Converts the given flow to a cold flowable.
  * The original flow is cancelled when the flowable subscriber is disposed.
+ *
+ * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
+ * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
+ * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
+ * is used, so calls are performed from an arbitrary thread.
  */
+@JvmOverloads // binary compatibility
 @JvmName("from")
 @ExperimentalCoroutinesApi
-public fun <T: Any> Flow<T>.asFlowable(): Flowable<T> = Flowable.fromPublisher(asPublisher())
+public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
+    Flowable.fromPublisher(asPublisher(context))