Provide asFlowable and asObservable by their names in binary instead … (#2285)

* Provide asFlowable and asObservable by their names in binary instead of 'from' function to prevent naming clash for Java users.
* Do not provide @JvmOverloads for convenience of Java interop
* Deprecate ReceiveChannel.asObservable by the way

Fixes #2182
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
index 264cdad..cf73ef2 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
@@ -16,7 +16,7 @@
 
 /**
  * Converts this job to the hot reactive completable that signals
- * with [onCompleted][CompletableSubscriber.onCompleted] when the corresponding job completes.
+ * with [onCompleted][CompletableObserver.onComplete] when the corresponding job completes.
  *
  * Every subscriber gets the signal at the same time.
  * Unsubscribing from the resulting completable **does not** affect the original job in any way.
@@ -50,7 +50,7 @@
 
 /**
  * Converts this deferred value to the hot reactive single that signals either
- * [onSuccess][SingleSubscriber.onSuccess] or [onError][SingleSubscriber.onError].
+ * [onSuccess][SingleObserver.onSuccess] or [onError][SingleObserver.onError].
  *
  * Every subscriber gets the same completion value.
  * Unsubscribing from the resulting single **does not** affect the original deferred value in any way.
@@ -66,21 +66,6 @@
 }
 
 /**
- * Converts a stream of elements received from the channel to the hot reactive observable.
- *
- * Every subscriber receives values from this channel in **fan-out** fashion. If the are multiple subscribers,
- * they'll receive values in round-robin way.
- */
-@Deprecated(
-    message = "Deprecated in the favour of Flow",
-    level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.consumeAsFlow().asObservable()")
-)
-public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = rxObservable(context) {
-    for (t in this@asObservable)
-        send(t)
-}
-
-/**
  * Transforms given cold [ObservableSource] into cold [Flow].
  *
  * The resulting flow is _cold_, which means that [ObservableSource.subscribe] is called every time a terminal operator
@@ -113,8 +98,6 @@
  * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
  * is used, so calls are performed from an arbitrary thread.
  */
-@JvmOverloads // binary compatibility
-@JvmName("from")
 @ExperimentalCoroutinesApi
 public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
     /*
@@ -148,8 +131,29 @@
  * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
  * is used, so calls are performed from an arbitrary thread.
  */
-@JvmOverloads // binary compatibility
-@JvmName("from")
 @ExperimentalCoroutinesApi
 public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
     Flowable.fromPublisher(asPublisher(context))
+
+@Deprecated(
+    message = "Deprecated in the favour of Flow",
+    level = DeprecationLevel.ERROR,
+    replaceWith = ReplaceWith("this.consumeAsFlow().asObservable(context)", "kotlinx.coroutines.flow.consumeAsFlow")
+) // Deprecated since 1.4.0
+public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = rxObservable(context) {
+    for (t in this@asObservable)
+        send(t)
+}
+
+@Suppress("UNUSED") // KT-42513
+@JvmOverloads // binary compatibility
+@JvmName("from")
+@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
+public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
+    asFlowable(context)
+
+@Suppress("UNUSED") // KT-42513
+@JvmOverloads // binary compatibility
+@JvmName("from")
+@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
+public fun <T: Any> Flow<T>._asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = asObservable(context)