blob: 640b51c8c7985e72bc02b882516b42f9bbc5a2cf [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kotlinx.coroutines.experimental.reactive
18
Roman Elizarovf5f09832018-05-16 15:10:28 +030019import kotlinx.atomicfu.*
20import kotlinx.coroutines.experimental.channels.*
21import org.reactivestreams.*
Roman Elizarov331750b2017-02-15 17:59:17 +030022
23/**
Roman Elizarov331750b2017-02-15 17:59:17 +030024 * Subscribes to this [Publisher] and returns a channel to receive elements emitted by it.
Marko Devcic1d6230a2018-04-04 20:13:08 +020025 * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this publisher.
Roman Elizarov3e342e32018-01-13 20:05:51 +030026 * @param request how many items to request from publisher in advance (optional, on-demand request by default).
Roman Elizarov331750b2017-02-15 17:59:17 +030027 */
Roman Elizarovf5f09832018-05-16 15:10:28 +030028@Suppress("CONFLICTING_OVERLOADS")
Marko Devcic1d6230a2018-04-04 20:13:08 +020029public fun <T> Publisher<T>.openSubscription(request: Int = 0): ReceiveChannel<T> {
Roman Elizarov3e342e32018-01-13 20:05:51 +030030 val channel = SubscriptionChannel<T>(request)
Roman Elizarov331750b2017-02-15 17:59:17 +030031 subscribe(channel)
32 return channel
33}
34
Roman Elizarovf5f09832018-05-16 15:10:28 +030035/** @suppress **Deprecated**: Left here for binary compatibility */
36@JvmOverloads // for binary compatibility
37@Deprecated(level = DeprecationLevel.HIDDEN, message = "Left here for binary compatibility")
38@Suppress("CONFLICTING_OVERLOADS")
39public fun <T> Publisher<T>.openSubscription(request: Int = 0): SubscriptionReceiveChannel<T> =
40 openSubscription(request) as SubscriptionReceiveChannel<T>
41
Roman Elizarov331750b2017-02-15 17:59:17 +030042/**
Roman Elizarov0f66a6d2017-06-22 14:57:53 +030043 * @suppress **Deprecated**: Renamed to [openSubscription]
44 */
45@Deprecated(message = "Renamed to `openSubscription`",
46 replaceWith = ReplaceWith("openSubscription()"))
Roman Elizarovf5f09832018-05-16 15:10:28 +030047public fun <T> Publisher<T>.open(): SubscriptionReceiveChannel<T> =
48 openSubscription() as SubscriptionReceiveChannel<T>
Roman Elizarov0f66a6d2017-06-22 14:57:53 +030049
50/**
Roman Elizarov331750b2017-02-15 17:59:17 +030051 * Subscribes to this [Publisher] and returns an iterator to receive elements emitted by it.
52 *
Roman Elizarov0f66a6d2017-06-22 14:57:53 +030053 * This is a shortcut for `open().iterator()`. See [openSubscription] if you need an ability to manually
Roman Elizarov331750b2017-02-15 17:59:17 +030054 * unsubscribe from the observable.
55 */
Roman Elizarov86349be2017-03-17 16:47:37 +030056@Suppress("DeprecatedCallableAddReplaceWith")
57@Deprecated(message =
58 "This iteration operator for `for (x in source) { ... }` loop is deprecated, " +
59 "because it leaves code vulnerable to leaving unclosed subscriptions on exception. " +
60 "Use `source.consumeEach { x -> ... }`.")
Roman Elizarov0f66a6d2017-06-22 14:57:53 +030061public operator fun <T> Publisher<T>.iterator() = openSubscription().iterator()
Roman Elizarov331750b2017-02-15 17:59:17 +030062
Roman Elizarov86349be2017-03-17 16:47:37 +030063/**
64 * Subscribes to this [Publisher] and performs the specified action for each received element.
65 */
Roman Elizarov3e342e32018-01-13 20:05:51 +030066public suspend inline fun <T> Publisher<T>.consumeEach(action: (T) -> Unit) {
Marko Devcic1d6230a2018-04-04 20:13:08 +020067 val channel = openSubscription()
68 for (x in channel) action(x)
69 channel.cancel()
Roman Elizarov86349be2017-03-17 16:47:37 +030070}
71
Roman Elizarov537c3592017-08-16 19:04:31 +030072/**
73 * @suppress: **Deprecated**: binary compatibility with old code
74 */
75@Deprecated("binary compatibility with old code", level = DeprecationLevel.HIDDEN)
76public suspend fun <T> Publisher<T>.consumeEach(action: suspend (T) -> Unit) =
77 consumeEach { action(it) }
78
Roman Elizarov3e342e32018-01-13 20:05:51 +030079private class SubscriptionChannel<T>(
80 private val request: Int
Roman Elizarovf5f09832018-05-16 15:10:28 +030081) : LinkedListChannel<T>(), ReceiveChannel<T>, Subscriber<T>, SubscriptionReceiveChannel<T> {
Roman Elizarov3e342e32018-01-13 20:05:51 +030082 init {
83 require(request >= 0) { "Invalid request size: $request" }
84 }
Roman Elizarov331750b2017-02-15 17:59:17 +030085
Roman Elizarov3e342e32018-01-13 20:05:51 +030086 @Volatile
87 private var subscription: Subscription? = null
88
89 // requested from subscription minus number of received minus number of enqueued receivers,
90 // can be negative if we have receivers, but no subscription yet
91 private val _requested = atomic(0)
Roman Elizarov331750b2017-02-15 17:59:17 +030092
93 // AbstractChannel overrides
Roman Elizarov3e342e32018-01-13 20:05:51 +030094 override fun onReceiveEnqueued() {
95 _requested.loop { wasRequested ->
Roman Elizarov331750b2017-02-15 17:59:17 +030096 val subscription = this.subscription
Roman Elizarov3e342e32018-01-13 20:05:51 +030097 val needRequested = wasRequested - 1
98 if (subscription != null && needRequested < 0) { // need to request more from subscription
99 // try to fixup by making request
100 if (wasRequested != request && !_requested.compareAndSet(wasRequested, request))
101 return@loop // continue looping if failed
102 subscription.request((request - needRequested).toLong())
103 return
Roman Elizarov331750b2017-02-15 17:59:17 +0300104 }
Roman Elizarov3e342e32018-01-13 20:05:51 +0300105 // just do book-keeping
106 if (_requested.compareAndSet(wasRequested, needRequested)) return
Roman Elizarov331750b2017-02-15 17:59:17 +0300107 }
108 }
109
Roman Elizarov3e342e32018-01-13 20:05:51 +0300110 override fun onReceiveDequeued() {
111 _requested.incrementAndGet()
Roman Elizarov331750b2017-02-15 17:59:17 +0300112 }
113
114 override fun afterClose(cause: Throwable?) {
115 subscription?.cancel()
116 }
117
Roman Elizarov331750b2017-02-15 17:59:17 +0300118 // Subscriber overrides
119 override fun onSubscribe(s: Subscription) {
120 subscription = s
Roman Elizarov3e342e32018-01-13 20:05:51 +0300121 while (true) { // lock-free loop on _requested
Roman Elizarov331750b2017-02-15 17:59:17 +0300122 if (isClosedForSend) {
123 s.cancel()
124 return
125 }
Roman Elizarov3e342e32018-01-13 20:05:51 +0300126 val wasRequested = _requested.value
127 if (wasRequested >= request) return // ok -- normal story
128 // otherwise, receivers came before we had subscription or need to make initial request
Roman Elizarov331750b2017-02-15 17:59:17 +0300129 // try to fixup by making request
Roman Elizarov3e342e32018-01-13 20:05:51 +0300130 if (!_requested.compareAndSet(wasRequested, request)) continue
131 s.request((request - wasRequested).toLong())
Roman Elizarov331750b2017-02-15 17:59:17 +0300132 return
133 }
134 }
135
136 override fun onNext(t: T) {
Roman Elizarov3e342e32018-01-13 20:05:51 +0300137 _requested.decrementAndGet()
Roman Elizarov331750b2017-02-15 17:59:17 +0300138 offer(t)
139 }
140
141 override fun onComplete() {
142 close(cause = null)
143 }
144
145 override fun onError(e: Throwable) {
146 close(cause = e)
147 }
148}
149