blob: 41c82ed0e83ee6a30d51478b03c7374887dc932f [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
Aurimas Liutikasa2d34c52021-05-12 21:56:16 +00002 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov331750b2017-02-15 17:59:17 +03003 */
4
Roman Elizarov0950dfa2018-07-13 10:33:25 +03005package kotlinx.coroutines.rx2
Roman Elizarov331750b2017-02-15 17:59:17 +03006
Vsevolod Tolstopyatovbbaf99d2018-09-11 15:55:56 +03007import io.reactivex.*
Marek Langiewiczd831a862020-02-23 10:12:46 +01008import io.reactivex.disposables.*
Roman Elizarov0950dfa2018-07-13 10:33:25 +03009import kotlinx.coroutines.*
10import kotlinx.coroutines.channels.*
Vsevolod Tolstopyatov170690f2019-04-09 12:33:57 +030011import kotlinx.coroutines.flow.*
SokolovaMaria1dcfd972019-08-09 17:35:14 +030012import kotlinx.coroutines.reactive.*
Roman Elizarov1a6beba2020-08-10 18:12:22 +030013import org.reactivestreams.*
Marek Langiewiczd831a862020-02-23 10:12:46 +010014import java.util.concurrent.atomic.*
Roman Elizarov0950dfa2018-07-13 10:33:25 +030015import kotlin.coroutines.*
Roman Elizarov331750b2017-02-15 17:59:17 +030016
17/**
18 * Converts this job to the hot reactive completable that signals
Vsevolod Tolstopyatov448106a2020-10-08 02:01:09 -070019 * with [onCompleted][CompletableObserver.onComplete] when the corresponding job completes.
Roman Elizarov331750b2017-02-15 17:59:17 +030020 *
21 * Every subscriber gets the signal at the same time.
22 * Unsubscribing from the resulting completable **does not** affect the original job in any way.
23 *
Roman Elizarov27b8f452018-09-20 21:23:41 +030024 * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
25 * in the future to account for the concept of structured concurrency.
26 *
Roman Elizarov331750b2017-02-15 17:59:17 +030027 * @param context -- the coroutine context from which the resulting completable is going to be signalled
28 */
Roman Elizarov27b8f452018-09-20 21:23:41 +030029@ExperimentalCoroutinesApi
Vsevolod Tolstopyatovd100a3f2019-07-16 16:14:35 +030030public fun Job.asCompletable(context: CoroutineContext): Completable = rxCompletable(context) {
Roman Elizarov3c3aed72017-03-09 12:31:59 +030031 this@asCompletable.join()
Roman Elizarov331750b2017-02-15 17:59:17 +030032}
33
34/**
Konrad Kamińskid6bb1482017-04-07 09:26:40 +020035 * Converts this deferred value to the hot reactive maybe that signals
36 * [onComplete][MaybeEmitter.onComplete], [onSuccess][MaybeEmitter.onSuccess] or [onError][MaybeEmitter.onError].
37 *
38 * Every subscriber gets the same completion value.
39 * Unsubscribing from the resulting maybe **does not** affect the original deferred value in any way.
40 *
Roman Elizarov27b8f452018-09-20 21:23:41 +030041 * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
42 * in the future to account for the concept of structured concurrency.
43 *
Konrad Kamińskid6bb1482017-04-07 09:26:40 +020044 * @param context -- the coroutine context from which the resulting maybe is going to be signalled
45 */
Roman Elizarov27b8f452018-09-20 21:23:41 +030046@ExperimentalCoroutinesApi
Vsevolod Tolstopyatovd100a3f2019-07-16 16:14:35 +030047public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMaybe(context) {
Konrad Kamińskid6bb1482017-04-07 09:26:40 +020048 this@asMaybe.await()
49}
50
51/**
Roman Elizarov331750b2017-02-15 17:59:17 +030052 * Converts this deferred value to the hot reactive single that signals either
Vsevolod Tolstopyatov448106a2020-10-08 02:01:09 -070053 * [onSuccess][SingleObserver.onSuccess] or [onError][SingleObserver.onError].
Roman Elizarov331750b2017-02-15 17:59:17 +030054 *
55 * Every subscriber gets the same completion value.
56 * Unsubscribing from the resulting single **does not** affect the original deferred value in any way.
57 *
Roman Elizarov27b8f452018-09-20 21:23:41 +030058 * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
59 * in the future to account for the concept of structured concurrency.
60 *
Roman Elizarov331750b2017-02-15 17:59:17 +030061 * @param context -- the coroutine context from which the resulting single is going to be signalled
62 */
Roman Elizarov27b8f452018-09-20 21:23:41 +030063@ExperimentalCoroutinesApi
Vsevolod Tolstopyatovd100a3f2019-07-16 16:14:35 +030064public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = rxSingle(context) {
Roman Elizarov3c3aed72017-03-09 12:31:59 +030065 this@asSingle.await()
Roman Elizarov331750b2017-02-15 17:59:17 +030066}
67
68/**
Marek Langiewiczd831a862020-02-23 10:12:46 +010069 * Transforms given cold [ObservableSource] into cold [Flow].
70 *
71 * The resulting flow is _cold_, which means that [ObservableSource.subscribe] is called every time a terminal operator
72 * is applied to the resulting flow.
73 *
74 * A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the
75 * resulting flow to specify a user-defined value and to control what happens when data is produced faster
76 * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details.
77 */
78@ExperimentalCoroutinesApi
79public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
80 val disposableRef = AtomicReference<Disposable>()
81 val observer = object : Observer<T> {
82 override fun onComplete() { close() }
83 override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
Vsevolod Tolstopyatovee780902020-10-23 08:20:31 -070084 override fun onNext(t: T) {
85 try {
86 sendBlocking(t)
87 } catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974
88 // Is handled by the downstream flow
89 }
90 }
Marek Langiewiczd831a862020-02-23 10:12:46 +010091 override fun onError(e: Throwable) { close(e) }
92 }
93
94 subscribe(observer)
Vsevolod Tolstopyatov8d8a8fb2020-03-04 23:33:18 +030095 awaitClose { disposableRef.getAndSet(Disposables.disposed())?.dispose() }
Marek Langiewiczd831a862020-02-23 10:12:46 +010096}
97
98/**
Vsevolod Tolstopyatov170690f2019-04-09 12:33:57 +030099 * Converts the given flow to a cold observable.
SokolovaMaria1dcfd972019-08-09 17:35:14 +0300100 * The original flow is cancelled when the observable subscriber is disposed.
Roman Elizarov1a6beba2020-08-10 18:12:22 +0300101 *
102 * An optional [context] can be specified to control the execution context of calls to [Observer] methods.
103 * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
104 * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
105 * is used, so calls are performed from an arbitrary thread.
Vsevolod Tolstopyatov170690f2019-04-09 12:33:57 +0300106 */
Vsevolod Tolstopyatov15c7d0f2019-06-06 11:47:19 +0300107@ExperimentalCoroutinesApi
Roman Elizarov1a6beba2020-08-10 18:12:22 +0300108public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
Vsevolod Tolstopyatov170690f2019-04-09 12:33:57 +0300109 /*
110 * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
111 * asObservable is already invoked from unconfined
112 */
Roman Elizarov1a6beba2020-08-10 18:12:22 +0300113 val job = GlobalScope.launch(Dispatchers.Unconfined + context, start = CoroutineStart.ATOMIC) {
Vsevolod Tolstopyatov170690f2019-04-09 12:33:57 +0300114 try {
115 collect { value -> emitter.onNext(value) }
116 emitter.onComplete()
117 } catch (e: Throwable) {
118 // 'create' provides safe emitter, so we can unconditionally call on* here if exception occurs in `onComplete`
Vsevolod Tolstopyatova930b0c2019-12-05 19:33:44 +0300119 if (e !is CancellationException) {
120 if (!emitter.tryOnError(e)) {
121 handleUndeliverableException(e, coroutineContext)
122 }
123 } else {
124 emitter.onComplete()
125 }
Vsevolod Tolstopyatov170690f2019-04-09 12:33:57 +0300126 }
127 }
128 emitter.setCancellable(RxCancellable(job))
129}
130
131/**
SokolovaMaria1dcfd972019-08-09 17:35:14 +0300132 * Converts the given flow to a cold flowable.
133 * The original flow is cancelled when the flowable subscriber is disposed.
Roman Elizarov1a6beba2020-08-10 18:12:22 +0300134 *
135 * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
136 * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
137 * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
138 * is used, so calls are performed from an arbitrary thread.
Vsevolod Tolstopyatov170690f2019-04-09 12:33:57 +0300139 */
Vsevolod Tolstopyatov15c7d0f2019-06-06 11:47:19 +0300140@ExperimentalCoroutinesApi
Roman Elizarov1a6beba2020-08-10 18:12:22 +0300141public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
142 Flowable.fromPublisher(asPublisher(context))
Vsevolod Tolstopyatov448106a2020-10-08 02:01:09 -0700143
144@Deprecated(
145 message = "Deprecated in the favour of Flow",
146 level = DeprecationLevel.ERROR,
147 replaceWith = ReplaceWith("this.consumeAsFlow().asObservable(context)", "kotlinx.coroutines.flow.consumeAsFlow")
148) // Deprecated since 1.4.0
149public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = rxObservable(context) {
150 for (t in this@asObservable)
151 send(t)
152}
153
154@Suppress("UNUSED") // KT-42513
155@JvmOverloads // binary compatibility
156@JvmName("from")
157@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
158public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
159 asFlowable(context)
160
161@Suppress("UNUSED") // KT-42513
162@JvmOverloads // binary compatibility
163@JvmName("from")
164@Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
165public fun <T: Any> Flow<T>._asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = asObservable(context)