blob: bd369cad55f9c80562bbd129b710ffd8d035e4d0 [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov331750b2017-02-15 17:59:17 +03003 */
4
Roman Elizarov0950dfa2018-07-13 10:33:25 +03005package kotlinx.coroutines.rx2
Roman Elizarov331750b2017-02-15 17:59:17 +03006
Vsevolod Tolstopyatovbbaf99d2018-09-11 15:55:56 +03007import io.reactivex.*
Roman Elizarov0950dfa2018-07-13 10:33:25 +03008import kotlinx.coroutines.*
9import kotlinx.coroutines.channels.*
Vsevolod Tolstopyatov170690f2019-04-09 12:33:57 +030010import kotlinx.coroutines.flow.*
SokolovaMaria1dcfd972019-08-09 17:35:14 +030011import kotlinx.coroutines.reactive.*
Roman Elizarov0950dfa2018-07-13 10:33:25 +030012import kotlin.coroutines.*
Roman Elizarov331750b2017-02-15 17:59:17 +030013
14/**
15 * Converts this job to the hot reactive completable that signals
16 * with [onCompleted][CompletableSubscriber.onCompleted] when the corresponding job completes.
17 *
18 * Every subscriber gets the signal at the same time.
19 * Unsubscribing from the resulting completable **does not** affect the original job in any way.
20 *
Roman Elizarov27b8f452018-09-20 21:23:41 +030021 * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
22 * in the future to account for the concept of structured concurrency.
23 *
Roman Elizarov331750b2017-02-15 17:59:17 +030024 * @param context -- the coroutine context from which the resulting completable is going to be signalled
25 */
Roman Elizarov27b8f452018-09-20 21:23:41 +030026@ExperimentalCoroutinesApi
Vsevolod Tolstopyatovd100a3f2019-07-16 16:14:35 +030027public fun Job.asCompletable(context: CoroutineContext): Completable = rxCompletable(context) {
Roman Elizarov3c3aed72017-03-09 12:31:59 +030028 this@asCompletable.join()
Roman Elizarov331750b2017-02-15 17:59:17 +030029}
30
31/**
Konrad Kamińskid6bb1482017-04-07 09:26:40 +020032 * Converts this deferred value to the hot reactive maybe that signals
33 * [onComplete][MaybeEmitter.onComplete], [onSuccess][MaybeEmitter.onSuccess] or [onError][MaybeEmitter.onError].
34 *
35 * Every subscriber gets the same completion value.
36 * Unsubscribing from the resulting maybe **does not** affect the original deferred value in any way.
37 *
Roman Elizarov27b8f452018-09-20 21:23:41 +030038 * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
39 * in the future to account for the concept of structured concurrency.
40 *
Konrad Kamińskid6bb1482017-04-07 09:26:40 +020041 * @param context -- the coroutine context from which the resulting maybe is going to be signalled
42 */
Roman Elizarov27b8f452018-09-20 21:23:41 +030043@ExperimentalCoroutinesApi
Vsevolod Tolstopyatovd100a3f2019-07-16 16:14:35 +030044public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMaybe(context) {
Konrad Kamińskid6bb1482017-04-07 09:26:40 +020045 this@asMaybe.await()
46}
47
48/**
Roman Elizarov331750b2017-02-15 17:59:17 +030049 * Converts this deferred value to the hot reactive single that signals either
50 * [onSuccess][SingleSubscriber.onSuccess] or [onError][SingleSubscriber.onError].
51 *
52 * Every subscriber gets the same completion value.
53 * Unsubscribing from the resulting single **does not** affect the original deferred value in any way.
54 *
Roman Elizarov27b8f452018-09-20 21:23:41 +030055 * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
56 * in the future to account for the concept of structured concurrency.
57 *
Roman Elizarov331750b2017-02-15 17:59:17 +030058 * @param context -- the coroutine context from which the resulting single is going to be signalled
59 */
Roman Elizarov27b8f452018-09-20 21:23:41 +030060@ExperimentalCoroutinesApi
Vsevolod Tolstopyatovd100a3f2019-07-16 16:14:35 +030061public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = rxSingle(context) {
Roman Elizarov3c3aed72017-03-09 12:31:59 +030062 this@asSingle.await()
Roman Elizarov331750b2017-02-15 17:59:17 +030063}
64
65/**
66 * Converts a stream of elements received from the channel to the hot reactive observable.
67 *
68 * Every subscriber receives values from this channel in **fan-out** fashion. If the are multiple subscribers,
69 * they'll receive values in round-robin way.
Roman Elizarov331750b2017-02-15 17:59:17 +030070 */
Vsevolod Tolstopyatov5378b802019-09-19 17:39:08 +030071@Deprecated(
72 message = "Deprecated in the favour of Flow",
73 level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.consumeAsFlow().asObservable()")
74)
Vsevolod Tolstopyatovd100a3f2019-07-16 16:14:35 +030075public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = rxObservable(context) {
Roman Elizarov3c3aed72017-03-09 12:31:59 +030076 for (t in this@asObservable)
Roman Elizarov331750b2017-02-15 17:59:17 +030077 send(t)
78}
Vsevolod Tolstopyatov170690f2019-04-09 12:33:57 +030079
80/**
81 * Converts the given flow to a cold observable.
SokolovaMaria1dcfd972019-08-09 17:35:14 +030082 * The original flow is cancelled when the observable subscriber is disposed.
Vsevolod Tolstopyatov170690f2019-04-09 12:33:57 +030083 */
Vsevolod Tolstopyatov170690f2019-04-09 12:33:57 +030084@JvmName("from")
Vsevolod Tolstopyatov15c7d0f2019-06-06 11:47:19 +030085@ExperimentalCoroutinesApi
Vsevolod Tolstopyatov170690f2019-04-09 12:33:57 +030086public fun <T: Any> Flow<T>.asObservable() : Observable<T> = Observable.create { emitter ->
87 /*
88 * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
89 * asObservable is already invoked from unconfined
90 */
91 val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.ATOMIC) {
92 try {
93 collect { value -> emitter.onNext(value) }
94 emitter.onComplete()
95 } catch (e: Throwable) {
96 // 'create' provides safe emitter, so we can unconditionally call on* here if exception occurs in `onComplete`
Vsevolod Tolstopyatova930b0c2019-12-05 19:33:44 +030097 if (e !is CancellationException) {
98 if (!emitter.tryOnError(e)) {
99 handleUndeliverableException(e, coroutineContext)
100 }
101 } else {
102 emitter.onComplete()
103 }
Vsevolod Tolstopyatov170690f2019-04-09 12:33:57 +0300104 }
105 }
106 emitter.setCancellable(RxCancellable(job))
107}
108
109/**
SokolovaMaria1dcfd972019-08-09 17:35:14 +0300110 * Converts the given flow to a cold flowable.
111 * The original flow is cancelled when the flowable subscriber is disposed.
Vsevolod Tolstopyatov170690f2019-04-09 12:33:57 +0300112 */
Vsevolod Tolstopyatov170690f2019-04-09 12:33:57 +0300113@JvmName("from")
Vsevolod Tolstopyatov15c7d0f2019-06-06 11:47:19 +0300114@ExperimentalCoroutinesApi
Vsevolod Tolstopyatov61c64cc2019-04-12 16:05:58 +0300115public fun <T: Any> Flow<T>.asFlowable(): Flowable<T> = Flowable.fromPublisher(asPublisher())