blob: 8257b414dd0c4d62a1821b4fc05db4ab654085c9 [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kotlinx.coroutines.experimental.rx1
18
Roman Elizarovf5f09832018-05-16 15:10:28 +030019import kotlinx.atomicfu.*
20import kotlinx.coroutines.experimental.channels.*
21import rx.*
Roman Elizarov331750b2017-02-15 17:59:17 +030022
23/**
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +030024 * Subscribes to this [Observable] and returns a channel to receive elements emitted by it.
Marko Devcic1d6230a2018-04-04 20:13:08 +020025 * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this observable.
Roman Elizarov3e342e32018-01-13 20:05:51 +030026 * @param request how many items to request from publisher in advance (optional, on-demand request by default).
Roman Elizarov331750b2017-02-15 17:59:17 +030027 */
Roman Elizarovf5f09832018-05-16 15:10:28 +030028@Suppress("CONFLICTING_OVERLOADS")
Marko Devcic1d6230a2018-04-04 20:13:08 +020029public fun <T> Observable<T>.openSubscription(request: Int = 0): ReceiveChannel<T> {
Roman Elizarov3e342e32018-01-13 20:05:51 +030030 val channel = SubscriptionChannel<T>(request)
Roman Elizarov331750b2017-02-15 17:59:17 +030031 val subscription = subscribe(channel.subscriber)
32 channel.subscription = subscription
33 if (channel.isClosedForSend) subscription.unsubscribe()
34 return channel
35}
36
Roman Elizarovf5f09832018-05-16 15:10:28 +030037/** @suppress **Deprecated**: Left here for binary compatibility */
38@JvmOverloads // for binary compatibility
39@Deprecated(level = DeprecationLevel.HIDDEN, message = "Left here for binary compatibility")
40@Suppress("CONFLICTING_OVERLOADS")
41public fun <T> Observable<T>.openSubscription(request: Int = 0): SubscriptionReceiveChannel<T> =
42 openSubscription(request) as SubscriptionReceiveChannel<T>
43
Roman Elizarov331750b2017-02-15 17:59:17 +030044/**
Roman Elizarov0f66a6d2017-06-22 14:57:53 +030045 * @suppress **Deprecated**: Renamed to [openSubscription]
46 */
47@Deprecated(message = "Renamed to `openSubscription`",
48 replaceWith = ReplaceWith("openSubscription()"))
Roman Elizarovf5f09832018-05-16 15:10:28 +030049public fun <T> Observable<T>.open(): SubscriptionReceiveChannel<T> =
50 openSubscription() as SubscriptionReceiveChannel<T>
Roman Elizarov0f66a6d2017-06-22 14:57:53 +030051
52/**
Roman Elizarov331750b2017-02-15 17:59:17 +030053 * Subscribes to this [Observable] and returns an iterator to receive elements emitted by it.
54 *
Roman Elizarov0f66a6d2017-06-22 14:57:53 +030055 * This is a shortcut for `open().iterator()`. See [openSubscription] if you need an ability to manually
Roman Elizarov331750b2017-02-15 17:59:17 +030056 * unsubscribe from the observable.
57 */
Roman Elizarov86349be2017-03-17 16:47:37 +030058@Suppress("DeprecatedCallableAddReplaceWith")
59@Deprecated(message =
60"This iteration operator for `for (x in source) { ... }` loop is deprecated, " +
61 "because it leaves code vulnerable to leaving unclosed subscriptions on exception. " +
62 "Use `source.consumeEach { x -> ... }`.")
Roman Elizarov0f66a6d2017-06-22 14:57:53 +030063public operator fun <T> Observable<T>.iterator() = openSubscription().iterator()
Roman Elizarov331750b2017-02-15 17:59:17 +030064
Roman Elizarov86349be2017-03-17 16:47:37 +030065/**
66 * Subscribes to this [Observable] and performs the specified action for each received element.
67 */
Roman Elizarov3e342e32018-01-13 20:05:51 +030068public suspend inline fun <T> Observable<T>.consumeEach(action: (T) -> Unit) {
Marko Devcic1d6230a2018-04-04 20:13:08 +020069 val channel = openSubscription()
70 for (x in channel) action(x)
71 channel.cancel()
Roman Elizarov86349be2017-03-17 16:47:37 +030072}
73
Roman Elizarov537c3592017-08-16 19:04:31 +030074/**
75 * @suppress: **Deprecated**: binary compatibility with old code
76 */
77@Deprecated("binary compatibility with old code", level = DeprecationLevel.HIDDEN)
78public suspend fun <T> Observable<T>.consumeEach(action: suspend (T) -> Unit) =
79 consumeEach { action(it) }
80
Roman Elizarov3e342e32018-01-13 20:05:51 +030081private class SubscriptionChannel<T>(
82 private val request: Int
Roman Elizarovf5f09832018-05-16 15:10:28 +030083) : LinkedListChannel<T>(), ReceiveChannel<T>, SubscriptionReceiveChannel<T> {
Roman Elizarov3e342e32018-01-13 20:05:51 +030084 init {
85 require(request >= 0) { "Invalid request size: $request" }
86 }
87
Roman Elizarov331750b2017-02-15 17:59:17 +030088 @JvmField
Roman Elizarov3e342e32018-01-13 20:05:51 +030089 val subscriber: ChannelSubscriber = ChannelSubscriber(request)
Roman Elizarov331750b2017-02-15 17:59:17 +030090
91 @Volatile
92 @JvmField
93 var subscription: Subscription? = null
94
Roman Elizarov3e342e32018-01-13 20:05:51 +030095 // requested from subscription minus number of received minus number of enqueued receivers,
96 private val _requested = atomic(request)
Roman Elizarov331750b2017-02-15 17:59:17 +030097
98 // AbstractChannel overrides
Roman Elizarov3e342e32018-01-13 20:05:51 +030099 override fun onReceiveEnqueued() {
100 _requested.loop { wasRequested ->
101 val needRequested = wasRequested - 1
102 if (needRequested < 0) { // need to request more from subscriber
103 // try to fixup by making request
104 if (wasRequested != request && !_requested.compareAndSet(wasRequested, request))
105 return@loop // continue looping if failed
106 subscriber.makeRequest((request - needRequested).toLong())
Roman Elizarov331750b2017-02-15 17:59:17 +0300107 return
108 }
Roman Elizarov3e342e32018-01-13 20:05:51 +0300109 // just do book-keeping
110 if (_requested.compareAndSet(wasRequested, needRequested)) return
Roman Elizarov331750b2017-02-15 17:59:17 +0300111 }
112 }
113
Roman Elizarov3e342e32018-01-13 20:05:51 +0300114 override fun onReceiveDequeued() {
115 _requested.incrementAndGet()
Roman Elizarov331750b2017-02-15 17:59:17 +0300116 }
117
118 override fun afterClose(cause: Throwable?) {
119 subscription?.unsubscribe()
120 }
121
Roman Elizarov3e342e32018-01-13 20:05:51 +0300122 inner class ChannelSubscriber(private val request: Int): Subscriber<T>() {
123 fun makeRequest(n: Long) {
124 request(n)
Roman Elizarov331750b2017-02-15 17:59:17 +0300125 }
126
127 override fun onStart() {
Roman Elizarov3e342e32018-01-13 20:05:51 +0300128 request(request.toLong()) // init backpressure
Roman Elizarov331750b2017-02-15 17:59:17 +0300129 }
130
131 override fun onNext(t: T) {
Roman Elizarov3e342e32018-01-13 20:05:51 +0300132 _requested.decrementAndGet()
Roman Elizarov331750b2017-02-15 17:59:17 +0300133 offer(t)
134 }
135
136 override fun onCompleted() {
137 close(cause = null)
138 }
139
140 override fun onError(e: Throwable) {
141 close(cause = e)
142 }
143 }
144}