blob: a39f597936748ee0467d309229f83ad0f6eb20f6 [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov331750b2017-02-15 17:59:17 +03003 */
4
5package kotlinx.coroutines.experimental.reactive
6
Roman Elizarovf5f09832018-05-16 15:10:28 +03007import kotlinx.atomicfu.*
8import kotlinx.coroutines.experimental.channels.*
9import org.reactivestreams.*
Roman Elizarov331750b2017-02-15 17:59:17 +030010
11/**
Roman Elizarov331750b2017-02-15 17:59:17 +030012 * Subscribes to this [Publisher] and returns a channel to receive elements emitted by it.
Marko Devcic1d6230a2018-04-04 20:13:08 +020013 * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this publisher.
Roman Elizarov3e342e32018-01-13 20:05:51 +030014 * @param request how many items to request from publisher in advance (optional, on-demand request by default).
Roman Elizarov331750b2017-02-15 17:59:17 +030015 */
Roman Elizarovf5f09832018-05-16 15:10:28 +030016@Suppress("CONFLICTING_OVERLOADS")
Marko Devcic1d6230a2018-04-04 20:13:08 +020017public fun <T> Publisher<T>.openSubscription(request: Int = 0): ReceiveChannel<T> {
Roman Elizarov3e342e32018-01-13 20:05:51 +030018 val channel = SubscriptionChannel<T>(request)
Roman Elizarov331750b2017-02-15 17:59:17 +030019 subscribe(channel)
20 return channel
21}
22
Roman Elizarovf5f09832018-05-16 15:10:28 +030023/** @suppress **Deprecated**: Left here for binary compatibility */
24@JvmOverloads // for binary compatibility
25@Deprecated(level = DeprecationLevel.HIDDEN, message = "Left here for binary compatibility")
26@Suppress("CONFLICTING_OVERLOADS")
27public fun <T> Publisher<T>.openSubscription(request: Int = 0): SubscriptionReceiveChannel<T> =
28 openSubscription(request) as SubscriptionReceiveChannel<T>
29
Roman Elizarov331750b2017-02-15 17:59:17 +030030/**
Roman Elizarov0f66a6d2017-06-22 14:57:53 +030031 * @suppress **Deprecated**: Renamed to [openSubscription]
32 */
33@Deprecated(message = "Renamed to `openSubscription`",
34 replaceWith = ReplaceWith("openSubscription()"))
Roman Elizarovf5f09832018-05-16 15:10:28 +030035public fun <T> Publisher<T>.open(): SubscriptionReceiveChannel<T> =
36 openSubscription() as SubscriptionReceiveChannel<T>
Roman Elizarov0f66a6d2017-06-22 14:57:53 +030037
38/**
Roman Elizarov331750b2017-02-15 17:59:17 +030039 * Subscribes to this [Publisher] and returns an iterator to receive elements emitted by it.
40 *
Roman Elizarov0f66a6d2017-06-22 14:57:53 +030041 * This is a shortcut for `open().iterator()`. See [openSubscription] if you need an ability to manually
Roman Elizarov331750b2017-02-15 17:59:17 +030042 * unsubscribe from the observable.
43 */
Roman Elizarov86349be2017-03-17 16:47:37 +030044@Suppress("DeprecatedCallableAddReplaceWith")
45@Deprecated(message =
46 "This iteration operator for `for (x in source) { ... }` loop is deprecated, " +
47 "because it leaves code vulnerable to leaving unclosed subscriptions on exception. " +
48 "Use `source.consumeEach { x -> ... }`.")
Roman Elizarov0f66a6d2017-06-22 14:57:53 +030049public operator fun <T> Publisher<T>.iterator() = openSubscription().iterator()
Roman Elizarov331750b2017-02-15 17:59:17 +030050
Roman Elizarov86349be2017-03-17 16:47:37 +030051/**
52 * Subscribes to this [Publisher] and performs the specified action for each received element.
53 */
Roman Elizarov3e342e32018-01-13 20:05:51 +030054public suspend inline fun <T> Publisher<T>.consumeEach(action: (T) -> Unit) {
Marko Devcic1d6230a2018-04-04 20:13:08 +020055 val channel = openSubscription()
56 for (x in channel) action(x)
57 channel.cancel()
Roman Elizarov86349be2017-03-17 16:47:37 +030058}
59
Roman Elizarov537c3592017-08-16 19:04:31 +030060/**
61 * @suppress: **Deprecated**: binary compatibility with old code
62 */
63@Deprecated("binary compatibility with old code", level = DeprecationLevel.HIDDEN)
64public suspend fun <T> Publisher<T>.consumeEach(action: suspend (T) -> Unit) =
65 consumeEach { action(it) }
66
Roman Elizarov3e342e32018-01-13 20:05:51 +030067private class SubscriptionChannel<T>(
68 private val request: Int
Roman Elizarovf5f09832018-05-16 15:10:28 +030069) : LinkedListChannel<T>(), ReceiveChannel<T>, Subscriber<T>, SubscriptionReceiveChannel<T> {
Roman Elizarov3e342e32018-01-13 20:05:51 +030070 init {
71 require(request >= 0) { "Invalid request size: $request" }
72 }
Roman Elizarov331750b2017-02-15 17:59:17 +030073
Roman Elizarov3e342e32018-01-13 20:05:51 +030074 @Volatile
75 private var subscription: Subscription? = null
76
77 // requested from subscription minus number of received minus number of enqueued receivers,
78 // can be negative if we have receivers, but no subscription yet
79 private val _requested = atomic(0)
Roman Elizarov331750b2017-02-15 17:59:17 +030080
81 // AbstractChannel overrides
Roman Elizarov3e342e32018-01-13 20:05:51 +030082 override fun onReceiveEnqueued() {
83 _requested.loop { wasRequested ->
Roman Elizarov331750b2017-02-15 17:59:17 +030084 val subscription = this.subscription
Roman Elizarov3e342e32018-01-13 20:05:51 +030085 val needRequested = wasRequested - 1
86 if (subscription != null && needRequested < 0) { // need to request more from subscription
87 // try to fixup by making request
88 if (wasRequested != request && !_requested.compareAndSet(wasRequested, request))
89 return@loop // continue looping if failed
90 subscription.request((request - needRequested).toLong())
91 return
Roman Elizarov331750b2017-02-15 17:59:17 +030092 }
Roman Elizarov3e342e32018-01-13 20:05:51 +030093 // just do book-keeping
94 if (_requested.compareAndSet(wasRequested, needRequested)) return
Roman Elizarov331750b2017-02-15 17:59:17 +030095 }
96 }
97
Roman Elizarov3e342e32018-01-13 20:05:51 +030098 override fun onReceiveDequeued() {
99 _requested.incrementAndGet()
Roman Elizarov331750b2017-02-15 17:59:17 +0300100 }
101
102 override fun afterClose(cause: Throwable?) {
103 subscription?.cancel()
104 }
105
Roman Elizarov331750b2017-02-15 17:59:17 +0300106 // Subscriber overrides
107 override fun onSubscribe(s: Subscription) {
108 subscription = s
Roman Elizarov3e342e32018-01-13 20:05:51 +0300109 while (true) { // lock-free loop on _requested
Roman Elizarov331750b2017-02-15 17:59:17 +0300110 if (isClosedForSend) {
111 s.cancel()
112 return
113 }
Roman Elizarov3e342e32018-01-13 20:05:51 +0300114 val wasRequested = _requested.value
115 if (wasRequested >= request) return // ok -- normal story
116 // otherwise, receivers came before we had subscription or need to make initial request
Roman Elizarov331750b2017-02-15 17:59:17 +0300117 // try to fixup by making request
Roman Elizarov3e342e32018-01-13 20:05:51 +0300118 if (!_requested.compareAndSet(wasRequested, request)) continue
119 s.request((request - wasRequested).toLong())
Roman Elizarov331750b2017-02-15 17:59:17 +0300120 return
121 }
122 }
123
124 override fun onNext(t: T) {
Roman Elizarov3e342e32018-01-13 20:05:51 +0300125 _requested.decrementAndGet()
Roman Elizarov331750b2017-02-15 17:59:17 +0300126 offer(t)
127 }
128
129 override fun onComplete() {
130 close(cause = null)
131 }
132
133 override fun onError(e: Throwable) {
134 close(cause = e)
135 }
136}
137