Provide asFlowable and asObservable by their names in binary instead … (#2285)

* Provide asFlowable and asObservable by their names in binary instead of 'from' function to prevent naming clash for Java users.
* Do not provide @JvmOverloads for convenience of Java interop
* Deprecate ReceiveChannel.asObservable by the way

Fixes #2182
diff --git a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api
index 06ddb68..2cf7bc8 100644
--- a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api
+++ b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api
@@ -30,13 +30,17 @@
 public final class kotlinx/coroutines/rx2/RxConvertKt {
 	public static final fun asCompletable (Lkotlinx/coroutines/Job;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Completable;
 	public static final fun asFlow (Lio/reactivex/ObservableSource;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun asFlowable (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Flowable;
+	public static synthetic fun asFlowable$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Flowable;
 	public static final fun asMaybe (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Maybe;
 	public static final fun asObservable (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
+	public static final fun asObservable (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
+	public static synthetic fun asObservable$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Observable;
 	public static final fun asSingle (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Single;
-	public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Flowable;
-	public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Observable;
-	public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Flowable;
-	public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
+	public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Flowable;
+	public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Observable;
+	public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Flowable;
+	public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
 	public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Flowable;
 	public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Observable;
 }
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
index 264cdad..cf73ef2 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
@@ -16,7 +16,7 @@
 
 /**
  * Converts this job to the hot reactive completable that signals
- * with [onCompleted][CompletableSubscriber.onCompleted] when the corresponding job completes.
+ * with [onCompleted][CompletableObserver.onComplete] when the corresponding job completes.
  *
  * Every subscriber gets the signal at the same time.
  * Unsubscribing from the resulting completable **does not** affect the original job in any way.
@@ -50,7 +50,7 @@
 
 /**
  * Converts this deferred value to the hot reactive single that signals either
- * [onSuccess][SingleSubscriber.onSuccess] or [onError][SingleSubscriber.onError].
+ * [onSuccess][SingleObserver.onSuccess] or [onError][SingleObserver.onError].
  *
  * Every subscriber gets the same completion value.
  * Unsubscribing from the resulting single **does not** affect the original deferred value in any way.
@@ -66,21 +66,6 @@
 }
 
 /**
- * Converts a stream of elements received from the channel to the hot reactive observable.
- *
- * Every subscriber receives values from this channel in **fan-out** fashion. If the are multiple subscribers,
- * they'll receive values in round-robin way.
- */
-@Deprecated(
-    message = "Deprecated in the favour of Flow",
-    level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.consumeAsFlow().asObservable()")
-)
-public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = rxObservable(context) {
-    for (t in this@asObservable)
-        send(t)
-}
-
-/**
  * Transforms given cold [ObservableSource] into cold [Flow].
  *
  * The resulting flow is _cold_, which means that [ObservableSource.subscribe] is called every time a terminal operator
@@ -113,8 +98,6 @@
  * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
  * is used, so calls are performed from an arbitrary thread.
  */
-@JvmOverloads // binary compatibility
-@JvmName("from")
 @ExperimentalCoroutinesApi
 public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
     /*
@@ -148,8 +131,29 @@
  * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
  * is used, so calls are performed from an arbitrary thread.
  */
-@JvmOverloads // binary compatibility
-@JvmName("from")
 @ExperimentalCoroutinesApi
 public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
     Flowable.fromPublisher(asPublisher(context))
+
+@Deprecated(
+    message = "Deprecated in the favour of Flow",
+    level = DeprecationLevel.ERROR,
+    replaceWith = ReplaceWith("this.consumeAsFlow().asObservable(context)", "kotlinx.coroutines.flow.consumeAsFlow")
+) // Deprecated since 1.4.0
+public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = rxObservable(context) {
+    for (t in this@asObservable)
+        send(t)
+}
+
+@Suppress("UNUSED") // KT-42513
+@JvmOverloads // binary compatibility
+@JvmName("from")
+@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
+public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
+    asFlowable(context)
+
+@Suppress("UNUSED") // KT-42513
+@JvmOverloads // binary compatibility
+@JvmName("from")
+@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
+public fun <T: Any> Flow<T>._asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = asObservable(context)
diff --git a/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt b/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt
index a433665..cfc3240 100644
--- a/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt
@@ -6,6 +6,7 @@
 
 import kotlinx.coroutines.*
 import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
 import org.junit.Assert
 import org.junit.Test
 import kotlin.test.*
@@ -126,7 +127,7 @@
             delay(50)
             send("K")
         }
-        val observable = c.asObservable(Dispatchers.Unconfined)
+        val observable = c.consumeAsFlow().asObservable(Dispatchers.Unconfined)
         checkSingleValue(observable.reduce { t1, t2 -> t1 + t2 }.toSingle()) {
             assertEquals("OK", it)
         }
@@ -140,7 +141,7 @@
             delay(50)
             throw TestException("K")
         }
-        val observable = c.asObservable(Dispatchers.Unconfined)
+        val observable = c.consumeAsFlow().asObservable(Dispatchers.Unconfined)
         val single = rxSingle(Dispatchers.Unconfined) {
             var result = ""
             try {
@@ -155,4 +156,4 @@
             assertEquals("OK", it)
         }
     }
-}
\ No newline at end of file
+}
diff --git a/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt
index 22e0e72..540fa76 100644
--- a/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt
@@ -6,6 +6,7 @@
 
 import io.reactivex.*
 import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
 import org.junit.Test
 import org.junit.runner.*
 import org.junit.runners.*
@@ -92,7 +93,7 @@
         assertFailsWith<IllegalArgumentException> { observable.awaitSingle() }
         checkNumbers(n, observable)
         val channel = observable.openSubscription()
-        checkNumbers(n, channel.asObservable(ctx(coroutineContext)))
+        checkNumbers(n, channel.consumeAsFlow().asObservable(ctx(coroutineContext)))
         channel.cancel()
     }
 
@@ -131,4 +132,4 @@
         assertEquals(n, last)
     }
 
-}
\ No newline at end of file
+}
diff --git a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api
index 4f15eda..eb92fd3 100644
--- a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api
+++ b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api
@@ -26,12 +26,16 @@
 public final class kotlinx/coroutines/rx3/RxConvertKt {
 	public static final fun asCompletable (Lkotlinx/coroutines/Job;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Completable;
 	public static final fun asFlow (Lio/reactivex/rxjava3/core/ObservableSource;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun asFlowable (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Flowable;
+	public static synthetic fun asFlowable$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Flowable;
 	public static final fun asMaybe (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Maybe;
+	public static final fun asObservable (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Observable;
+	public static synthetic fun asObservable$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Observable;
 	public static final fun asSingle (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Single;
-	public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Flowable;
-	public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Observable;
-	public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Flowable;
-	public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Observable;
+	public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Flowable;
+	public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/rxjava3/core/Observable;
+	public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Flowable;
+	public static final synthetic fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/rxjava3/core/Observable;
 	public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Flowable;
 	public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/rxjava3/core/Observable;
 }
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
index c7ab237..9bb38c0 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
@@ -16,7 +16,7 @@
 
 /**
  * Converts this job to the hot reactive completable that signals
- * with [onCompleted][CompletableSubscriber.onCompleted] when the corresponding job completes.
+ * with [onCompleted][CompletableObserver.onComplete] when the corresponding job completes.
  *
  * Every subscriber gets the signal at the same time.
  * Unsubscribing from the resulting completable **does not** affect the original job in any way.
@@ -50,7 +50,7 @@
 
 /**
  * Converts this deferred value to the hot reactive single that signals either
- * [onSuccess][SingleSubscriber.onSuccess] or [onError][SingleSubscriber.onError].
+ * [onSuccess][SingleObserver.onSuccess] or [onError][SingleObserver.onError].
  *
  * Every subscriber gets the same completion value.
  * Unsubscribing from the resulting single **does not** affect the original deferred value in any way.
@@ -98,8 +98,6 @@
  * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
  * is used, so calls are performed from an arbitrary thread.
  */
-@JvmOverloads // binary compatibility
-@JvmName("from")
 @ExperimentalCoroutinesApi
 public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
     /*
@@ -133,8 +131,19 @@
  * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
  * is used, so calls are performed from an arbitrary thread.
  */
-@JvmOverloads // binary compatibility
-@JvmName("from")
 @ExperimentalCoroutinesApi
 public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
     Flowable.fromPublisher(asPublisher(context))
+
+@Suppress("UNUSED") // KT-42513
+@JvmOverloads // binary compatibility
+@JvmName("from")
+@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
+public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
+    asFlowable(context)
+
+@Suppress("UNUSED") // KT-42513
+@JvmOverloads // binary compatibility
+@JvmName("from")
+@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
+public fun <T: Any> Flow<T>._asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = asObservable(context)