Provide asFlowable and asObservable by their names in binary instead … (#2285)
* Provide asFlowable and asObservable by their names in binary instead of 'from' function to prevent naming clash for Java users.
* Do not provide @JvmOverloads for convenience of Java interop
* Deprecate ReceiveChannel.asObservable by the way
Fixes #2182
diff --git a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api
index 06ddb68..2cf7bc8 100644
--- a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api
+++ b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api
@@ -30,13 +30,17 @@
public final class kotlinx/coroutines/rx2/RxConvertKt {
public static final fun asCompletable (Lkotlinx/coroutines/Job;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Completable;
public static final fun asFlow (Lio/reactivex/ObservableSource;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun asFlowable (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Flowable;
+ public static synthetic fun asFlowable$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Flowable;
public static final fun asMaybe (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Maybe;
public static final fun asObservable (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
+ public static final fun asObservable (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
+ public static synthetic fun asObservable$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Observable;
public static final fun asSingle (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Single;
- public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Flowable;
- public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Observable;
- public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Flowable;
- public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
+ public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Flowable;
+ public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Observable;
+ public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Flowable;
+ public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Flowable;
public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Observable;
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
index 264cdad..cf73ef2 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
@@ -16,7 +16,7 @@
/**
* Converts this job to the hot reactive completable that signals
- * with [onCompleted][CompletableSubscriber.onCompleted] when the corresponding job completes.
+ * with [onCompleted][CompletableObserver.onComplete] when the corresponding job completes.
*
* Every subscriber gets the signal at the same time.
* Unsubscribing from the resulting completable **does not** affect the original job in any way.
@@ -50,7 +50,7 @@
/**
* Converts this deferred value to the hot reactive single that signals either
- * [onSuccess][SingleSubscriber.onSuccess] or [onError][SingleSubscriber.onError].
+ * [onSuccess][SingleObserver.onSuccess] or [onError][SingleObserver.onError].
*
* Every subscriber gets the same completion value.
* Unsubscribing from the resulting single **does not** affect the original deferred value in any way.
@@ -66,21 +66,6 @@
}
/**
- * Converts a stream of elements received from the channel to the hot reactive observable.
- *
- * Every subscriber receives values from this channel in **fan-out** fashion. If the are multiple subscribers,
- * they'll receive values in round-robin way.
- */
-@Deprecated(
- message = "Deprecated in the favour of Flow",
- level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.consumeAsFlow().asObservable()")
-)
-public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = rxObservable(context) {
- for (t in this@asObservable)
- send(t)
-}
-
-/**
* Transforms given cold [ObservableSource] into cold [Flow].
*
* The resulting flow is _cold_, which means that [ObservableSource.subscribe] is called every time a terminal operator
@@ -113,8 +98,6 @@
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
-@JvmOverloads // binary compatibility
-@JvmName("from")
@ExperimentalCoroutinesApi
public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
/*
@@ -148,8 +131,29 @@
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
-@JvmOverloads // binary compatibility
-@JvmName("from")
@ExperimentalCoroutinesApi
public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
Flowable.fromPublisher(asPublisher(context))
+
+@Deprecated(
+ message = "Deprecated in the favour of Flow",
+ level = DeprecationLevel.ERROR,
+ replaceWith = ReplaceWith("this.consumeAsFlow().asObservable(context)", "kotlinx.coroutines.flow.consumeAsFlow")
+) // Deprecated since 1.4.0
+public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = rxObservable(context) {
+ for (t in this@asObservable)
+ send(t)
+}
+
+@Suppress("UNUSED") // KT-42513
+@JvmOverloads // binary compatibility
+@JvmName("from")
+@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
+public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
+ asFlowable(context)
+
+@Suppress("UNUSED") // KT-42513
+@JvmOverloads // binary compatibility
+@JvmName("from")
+@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
+public fun <T: Any> Flow<T>._asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = asObservable(context)
diff --git a/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt b/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt
index a433665..cfc3240 100644
--- a/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt
@@ -6,6 +6,7 @@
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
import org.junit.Assert
import org.junit.Test
import kotlin.test.*
@@ -126,7 +127,7 @@
delay(50)
send("K")
}
- val observable = c.asObservable(Dispatchers.Unconfined)
+ val observable = c.consumeAsFlow().asObservable(Dispatchers.Unconfined)
checkSingleValue(observable.reduce { t1, t2 -> t1 + t2 }.toSingle()) {
assertEquals("OK", it)
}
@@ -140,7 +141,7 @@
delay(50)
throw TestException("K")
}
- val observable = c.asObservable(Dispatchers.Unconfined)
+ val observable = c.consumeAsFlow().asObservable(Dispatchers.Unconfined)
val single = rxSingle(Dispatchers.Unconfined) {
var result = ""
try {
@@ -155,4 +156,4 @@
assertEquals("OK", it)
}
}
-}
\ No newline at end of file
+}
diff --git a/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt
index 22e0e72..540fa76 100644
--- a/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt
@@ -6,6 +6,7 @@
import io.reactivex.*
import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
import org.junit.Test
import org.junit.runner.*
import org.junit.runners.*
@@ -92,7 +93,7 @@
assertFailsWith<IllegalArgumentException> { observable.awaitSingle() }
checkNumbers(n, observable)
val channel = observable.openSubscription()
- checkNumbers(n, channel.asObservable(ctx(coroutineContext)))
+ checkNumbers(n, channel.consumeAsFlow().asObservable(ctx(coroutineContext)))
channel.cancel()
}
@@ -131,4 +132,4 @@
assertEquals(n, last)
}
-}
\ No newline at end of file
+}
diff --git a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api
index 4f15eda..eb92fd3 100644
--- a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api
+++ b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api
@@ -26,12 +26,16 @@
public final class kotlinx/coroutines/rx3/RxConvertKt {
public static final fun asCompletable (Lkotlinx/coroutines/Job;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Completable;
public static final fun asFlow (Lio/reactivex/rxjava3/core/ObservableSource;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun asFlowable (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Flowable;
+ public static synthetic fun asFlowable$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Flowable;
public static final fun asMaybe (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Maybe;
+ public static final fun asObservable (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Observable;
+ public static synthetic fun asObservable$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Observable;
public static final fun asSingle (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Single;
- public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Flowable;
- public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Observable;
- public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Flowable;
- public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Observable;
+ public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Flowable;
+ public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Observable;
+ public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Flowable;
+ public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Observable;
public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Flowable;
public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Observable;
}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
index c7ab237..9bb38c0 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
@@ -16,7 +16,7 @@
/**
* Converts this job to the hot reactive completable that signals
- * with [onCompleted][CompletableSubscriber.onCompleted] when the corresponding job completes.
+ * with [onCompleted][CompletableObserver.onComplete] when the corresponding job completes.
*
* Every subscriber gets the signal at the same time.
* Unsubscribing from the resulting completable **does not** affect the original job in any way.
@@ -50,7 +50,7 @@
/**
* Converts this deferred value to the hot reactive single that signals either
- * [onSuccess][SingleSubscriber.onSuccess] or [onError][SingleSubscriber.onError].
+ * [onSuccess][SingleObserver.onSuccess] or [onError][SingleObserver.onError].
*
* Every subscriber gets the same completion value.
* Unsubscribing from the resulting single **does not** affect the original deferred value in any way.
@@ -98,8 +98,6 @@
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
-@JvmOverloads // binary compatibility
-@JvmName("from")
@ExperimentalCoroutinesApi
public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
/*
@@ -133,8 +131,19 @@
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
-@JvmOverloads // binary compatibility
-@JvmName("from")
@ExperimentalCoroutinesApi
public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
Flowable.fromPublisher(asPublisher(context))
+
+@Suppress("UNUSED") // KT-42513
+@JvmOverloads // binary compatibility
+@JvmName("from")
+@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
+public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
+ asFlowable(context)
+
+@Suppress("UNUSED") // KT-42513
+@JvmOverloads // binary compatibility
+@JvmName("from")
+@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
+public fun <T: Any> Flow<T>._asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = asObservable(context)