Provide asFlowable and asObservable by their names in binary instead … (#2285)
* Provide asFlowable and asObservable by their names in binary instead of 'from' function to prevent naming clash for Java users.
* Do not provide @JvmOverloads for convenience of Java interop
* Deprecate ReceiveChannel.asObservable by the way
Fixes #2182
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
index 264cdad..cf73ef2 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
@@ -16,7 +16,7 @@
/**
* Converts this job to the hot reactive completable that signals
- * with [onCompleted][CompletableSubscriber.onCompleted] when the corresponding job completes.
+ * with [onCompleted][CompletableObserver.onComplete] when the corresponding job completes.
*
* Every subscriber gets the signal at the same time.
* Unsubscribing from the resulting completable **does not** affect the original job in any way.
@@ -50,7 +50,7 @@
/**
* Converts this deferred value to the hot reactive single that signals either
- * [onSuccess][SingleSubscriber.onSuccess] or [onError][SingleSubscriber.onError].
+ * [onSuccess][SingleObserver.onSuccess] or [onError][SingleObserver.onError].
*
* Every subscriber gets the same completion value.
* Unsubscribing from the resulting single **does not** affect the original deferred value in any way.
@@ -66,21 +66,6 @@
}
/**
- * Converts a stream of elements received from the channel to the hot reactive observable.
- *
- * Every subscriber receives values from this channel in **fan-out** fashion. If the are multiple subscribers,
- * they'll receive values in round-robin way.
- */
-@Deprecated(
- message = "Deprecated in the favour of Flow",
- level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.consumeAsFlow().asObservable()")
-)
-public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = rxObservable(context) {
- for (t in this@asObservable)
- send(t)
-}
-
-/**
* Transforms given cold [ObservableSource] into cold [Flow].
*
* The resulting flow is _cold_, which means that [ObservableSource.subscribe] is called every time a terminal operator
@@ -113,8 +98,6 @@
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
-@JvmOverloads // binary compatibility
-@JvmName("from")
@ExperimentalCoroutinesApi
public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
/*
@@ -148,8 +131,29 @@
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
-@JvmOverloads // binary compatibility
-@JvmName("from")
@ExperimentalCoroutinesApi
public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
Flowable.fromPublisher(asPublisher(context))
+
+@Deprecated(
+ message = "Deprecated in the favour of Flow",
+ level = DeprecationLevel.ERROR,
+ replaceWith = ReplaceWith("this.consumeAsFlow().asObservable(context)", "kotlinx.coroutines.flow.consumeAsFlow")
+) // Deprecated since 1.4.0
+public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = rxObservable(context) {
+ for (t in this@asObservable)
+ send(t)
+}
+
+@Suppress("UNUSED") // KT-42513
+@JvmOverloads // binary compatibility
+@JvmName("from")
+@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
+public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
+ asFlowable(context)
+
+@Suppress("UNUSED") // KT-42513
+@JvmOverloads // binary compatibility
+@JvmName("from")
+@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
+public fun <T: Any> Flow<T>._asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = asObservable(context)