blob: 013f382cf2e707ddb4a82264605bbe46ad9433f7 [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov331750b2017-02-15 17:59:17 +03003 */
4
5package kotlinx.coroutines.experimental.rx1
6
7import kotlinx.coroutines.experimental.Deferred
8import kotlinx.coroutines.experimental.Job
9import kotlinx.coroutines.experimental.channels.ReceiveChannel
10import rx.*
11import kotlin.coroutines.experimental.CoroutineContext
12
13/**
14 * Converts this job to the hot reactive completable that signals
15 * with [onCompleted][CompletableSubscriber.onCompleted] when the corresponding job completes.
16 *
17 * Every subscriber gets the signal at the same time.
18 * Unsubscribing from the resulting completable **does not** affect the original job in any way.
19 *
20 * @param context -- the coroutine context from which the resulting completable is going to be signalled
21 */
Roman Elizarov3c3aed72017-03-09 12:31:59 +030022public fun Job.asCompletable(context: CoroutineContext): Completable = rxCompletable(context) {
23 this@asCompletable.join()
Roman Elizarov331750b2017-02-15 17:59:17 +030024}
25
26/**
27 * Converts this deferred value to the hot reactive single that signals either
28 * [onSuccess][SingleSubscriber.onSuccess] or [onError][SingleSubscriber.onError].
29 *
30 * Every subscriber gets the same completion value.
31 * Unsubscribing from the resulting single **does not** affect the original deferred value in any way.
32 *
33 * @param context -- the coroutine context from which the resulting single is going to be signalled
34 */
Roman Elizarov3c3aed72017-03-09 12:31:59 +030035public fun <T> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = rxSingle<T>(context) {
36 this@asSingle.await()
Roman Elizarov331750b2017-02-15 17:59:17 +030037}
38
39/**
40 * Converts a stream of elements received from the channel to the hot reactive observable.
41 *
42 * Every subscriber receives values from this channel in **fan-out** fashion. If the are multiple subscribers,
43 * they'll receive values in round-robin way.
44 *
45 * @param context -- the coroutine context from which the resulting observable is going to be signalled
46 */
Roman Elizarov3c3aed72017-03-09 12:31:59 +030047public fun <T> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = rxObservable(context) {
48 for (t in this@asObservable)
Roman Elizarov331750b2017-02-15 17:59:17 +030049 send(t)
50}
Roman Elizarov3c3aed72017-03-09 12:31:59 +030051
52/**
53 * @suppress **Deprecated**: Renamed to [asCompletable]
54 */
55@Deprecated(message = "Renamed to `asCompletable`",
56 replaceWith = ReplaceWith("asCompletable(context)"))
57public fun Job.toCompletable(context: CoroutineContext): Completable = asCompletable(context)
58
59/**
60 * @suppress **Deprecated**: Renamed to [asSingle]
61 */
62@Deprecated(message = "Renamed to `asSingle`",
63 replaceWith = ReplaceWith("asSingle(context)"))
64public fun <T> Deferred<T>.toSingle(context: CoroutineContext): Single<T> = asSingle(context)
65
66/**
67 * @suppress **Deprecated**: Renamed to [asObservable]
68 */
69@Deprecated(message = "Renamed to `asObservable`",
70 replaceWith = ReplaceWith("asObservable(context)"))
71public fun <T> ReceiveChannel<T>.toObservable(context: CoroutineContext): Observable<T> = asObservable(context)