Support context in Flow.asPublisher and similar methods (#2156)
Fixes #2155
Co-authored-by: Vsevolod Tolstopyatov <qwwdfsad@gmail.com>
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
index 0be606f..264cdad 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
@@ -10,6 +10,7 @@
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
+import org.reactivestreams.*
import java.util.concurrent.atomic.*
import kotlin.coroutines.*
@@ -106,15 +107,21 @@
/**
* Converts the given flow to a cold observable.
* The original flow is cancelled when the observable subscriber is disposed.
+ *
+ * An optional [context] can be specified to control the execution context of calls to [Observer] methods.
+ * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
+ * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
+ * is used, so calls are performed from an arbitrary thread.
*/
+@JvmOverloads // binary compatibility
@JvmName("from")
@ExperimentalCoroutinesApi
-public fun <T: Any> Flow<T>.asObservable() : Observable<T> = Observable.create { emitter ->
+public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
/*
* ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
* asObservable is already invoked from unconfined
*/
- val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.ATOMIC) {
+ val job = GlobalScope.launch(Dispatchers.Unconfined + context, start = CoroutineStart.ATOMIC) {
try {
collect { value -> emitter.onNext(value) }
emitter.onComplete()
@@ -135,7 +142,14 @@
/**
* Converts the given flow to a cold flowable.
* The original flow is cancelled when the flowable subscriber is disposed.
+ *
+ * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
+ * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
+ * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
+ * is used, so calls are performed from an arbitrary thread.
*/
+@JvmOverloads // binary compatibility
@JvmName("from")
@ExperimentalCoroutinesApi
-public fun <T: Any> Flow<T>.asFlowable(): Flowable<T> = Flowable.fromPublisher(asPublisher())
+public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
+ Flowable.fromPublisher(asPublisher(context))