blob: 0b31efcb8f6265e300348eea952e5d054ccf296f [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov331750b2017-02-15 17:59:17 +03003 */
4
5package kotlinx.coroutines.experimental.rx2
6
Roman Elizarov0f66a6d2017-06-22 14:57:53 +03007import io.reactivex.*
Roman Elizarov331750b2017-02-15 17:59:17 +03008import io.reactivex.disposables.Disposable
Roman Elizarovf5f09832018-05-16 15:10:28 +03009import kotlinx.coroutines.experimental.channels.*
Roman Elizarov331750b2017-02-15 17:59:17 +030010
11/**
Konrad Kamińskid6bb1482017-04-07 09:26:40 +020012 * Subscribes to this [MaybeSource] and returns a channel to receive elements emitted by it.
Marko Devcic1d6230a2018-04-04 20:13:08 +020013 * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this source.
Konrad Kamińskid6bb1482017-04-07 09:26:40 +020014 */
Roman Elizarovf5f09832018-05-16 15:10:28 +030015@Suppress("CONFLICTING_OVERLOADS")
Marko Devcic1d6230a2018-04-04 20:13:08 +020016public fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
Konrad Kamińskid6bb1482017-04-07 09:26:40 +020017 val channel = SubscriptionChannel<T>()
18 subscribe(channel)
19 return channel
20}
21
Roman Elizarovf5f09832018-05-16 15:10:28 +030022/** @suppress **Deprecated**: Left here for binary compatibility */
23@Deprecated(level = DeprecationLevel.HIDDEN, message = "Left here for binary compatibility")
24@Suppress("CONFLICTING_OVERLOADS")
25public fun <T> MaybeSource<T>.openSubscription(): SubscriptionReceiveChannel<T> =
26 openSubscription() as SubscriptionReceiveChannel<T>
27
Konrad Kamińskid6bb1482017-04-07 09:26:40 +020028/**
Roman Elizarov0f66a6d2017-06-22 14:57:53 +030029 * @suppress **Deprecated**: Renamed to [openSubscription]
30 */
31@Deprecated(message = "Renamed to `openSubscription`",
32 replaceWith = ReplaceWith("openSubscription()"))
Roman Elizarovf5f09832018-05-16 15:10:28 +030033public fun <T> MaybeSource<T>.open(): SubscriptionReceiveChannel<T> =
34 openSubscription() as SubscriptionReceiveChannel<T>
Roman Elizarov0f66a6d2017-06-22 14:57:53 +030035
36/**
Roman Elizarov86349be2017-03-17 16:47:37 +030037 * Subscribes to this [ObservableSource] and returns a channel to receive elements emitted by it.
Marko Devcic1d6230a2018-04-04 20:13:08 +020038 * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this source.
Roman Elizarov331750b2017-02-15 17:59:17 +030039 */
Roman Elizarovf5f09832018-05-16 15:10:28 +030040@Suppress("CONFLICTING_OVERLOADS")
Marko Devcic1d6230a2018-04-04 20:13:08 +020041public fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
Roman Elizarov331750b2017-02-15 17:59:17 +030042 val channel = SubscriptionChannel<T>()
43 subscribe(channel)
44 return channel
45}
46
Roman Elizarovf5f09832018-05-16 15:10:28 +030047/** @suppress **Deprecated**: Left here for binary compatibility */
48@Deprecated(level = DeprecationLevel.HIDDEN, message = "Left here for binary compatibility")
49@Suppress("CONFLICTING_OVERLOADS")
50public fun <T> ObservableSource<T>.openSubscription(): SubscriptionReceiveChannel<T> =
51 openSubscription() as SubscriptionReceiveChannel<T>
52
Roman Elizarov331750b2017-02-15 17:59:17 +030053/**
Roman Elizarov0f66a6d2017-06-22 14:57:53 +030054 * @suppress **Deprecated**: Renamed to [openSubscription]
55 */
56@Deprecated(message = "Renamed to `openSubscription`",
57 replaceWith = ReplaceWith("openSubscription()"))
Roman Elizarovf5f09832018-05-16 15:10:28 +030058public fun <T> ObservableSource<T>.open(): SubscriptionReceiveChannel<T> =
59 openSubscription() as SubscriptionReceiveChannel<T>
Roman Elizarov0f66a6d2017-06-22 14:57:53 +030060
61/**
Roman Elizarov331750b2017-02-15 17:59:17 +030062 * Subscribes to this [Observable] and returns an iterator to receive elements emitted by it.
63 *
Roman Elizarov0f66a6d2017-06-22 14:57:53 +030064 * This is a shortcut for `open().iterator()`. See [openSubscription] if you need an ability to manually
Roman Elizarov331750b2017-02-15 17:59:17 +030065 * unsubscribe from the observable.
66 */
Roman Elizarov86349be2017-03-17 16:47:37 +030067@Suppress("DeprecatedCallableAddReplaceWith")
68@Deprecated(message =
69"This iteration operator for `for (x in source) { ... }` loop is deprecated, " +
70 "because it leaves code vulnerable to leaving unclosed subscriptions on exception. " +
71 "Use `source.consumeEach { x -> ... }`.")
Roman Elizarov0f66a6d2017-06-22 14:57:53 +030072public operator fun <T> ObservableSource<T>.iterator() = openSubscription().iterator()
Roman Elizarov331750b2017-02-15 17:59:17 +030073
Roman Elizarov86349be2017-03-17 16:47:37 +030074/**
Konrad Kamińskid6bb1482017-04-07 09:26:40 +020075 * Subscribes to this [MaybeSource] and performs the specified action for each received element.
76 */
Roman Elizarov537c3592017-08-16 19:04:31 +030077public inline suspend fun <T> MaybeSource<T>.consumeEach(action: (T) -> Unit) {
Marko Devcic1d6230a2018-04-04 20:13:08 +020078 val channel = openSubscription()
79 for (x in channel) action(x)
80 channel.cancel()
Konrad Kamińskid6bb1482017-04-07 09:26:40 +020081}
82
83/**
Roman Elizarov86349be2017-03-17 16:47:37 +030084 * Subscribes to this [ObservableSource] and performs the specified action for each received element.
85 */
Roman Elizarov537c3592017-08-16 19:04:31 +030086public inline suspend fun <T> ObservableSource<T>.consumeEach(action: (T) -> Unit) {
Marko Devcic1d6230a2018-04-04 20:13:08 +020087 val channel = openSubscription()
88 for (x in channel) action(x)
89 channel.cancel()
Roman Elizarov86349be2017-03-17 16:47:37 +030090}
91
Roman Elizarov537c3592017-08-16 19:04:31 +030092/**
93 * @suppress: **Deprecated**: binary compatibility with old code
94 */
95@Deprecated("binary compatibility with old code", level = DeprecationLevel.HIDDEN)
96public suspend fun <T> ObservableSource<T>.consumeEach(action: suspend (T) -> Unit) =
97 consumeEach { action(it) }
98
Roman Elizarovf5f09832018-05-16 15:10:28 +030099private class SubscriptionChannel<T> :
100 LinkedListChannel<T>(), ReceiveChannel<T>, Observer<T>, MaybeObserver<T>, SubscriptionReceiveChannel<T>
101{
Roman Elizarov331750b2017-02-15 17:59:17 +0300102 @Volatile
103 var subscription: Disposable? = null
104
105 // AbstractChannel overrides
106 override fun afterClose(cause: Throwable?) {
107 subscription?.dispose()
108 }
109
Roman Elizarov331750b2017-02-15 17:59:17 +0300110 // Observer overrider
111 override fun onSubscribe(sub: Disposable) {
112 subscription = sub
113 }
114
Konrad Kamińskid6bb1482017-04-07 09:26:40 +0200115 override fun onSuccess(t: T) {
116 offer(t)
117 }
118
Roman Elizarov331750b2017-02-15 17:59:17 +0300119 override fun onNext(t: T) {
120 offer(t)
121 }
122
123 override fun onComplete() {
124 close(cause = null)
125 }
126
127 override fun onError(e: Throwable) {
128 close(cause = e)
129 }
130}