Roman Elizarov | 331750b | 2017-02-15 17:59:17 +0300 | [diff] [blame] | 1 | /* |
Roman Elizarov | 1f74a2d | 2018-06-29 19:19:45 +0300 | [diff] [blame^] | 2 | * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
Roman Elizarov | 331750b | 2017-02-15 17:59:17 +0300 | [diff] [blame] | 3 | */ |
| 4 | |
| 5 | package kotlinx.coroutines.experimental.reactive |
| 6 | |
Roman Elizarov | f5f0983 | 2018-05-16 15:10:28 +0300 | [diff] [blame] | 7 | import kotlinx.atomicfu.* |
| 8 | import kotlinx.coroutines.experimental.channels.* |
| 9 | import org.reactivestreams.* |
Roman Elizarov | 331750b | 2017-02-15 17:59:17 +0300 | [diff] [blame] | 10 | |
| 11 | /** |
Roman Elizarov | 331750b | 2017-02-15 17:59:17 +0300 | [diff] [blame] | 12 | * Subscribes to this [Publisher] and returns a channel to receive elements emitted by it. |
Marko Devcic | 1d6230a | 2018-04-04 20:13:08 +0200 | [diff] [blame] | 13 | * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this publisher. |
Roman Elizarov | 3e342e3 | 2018-01-13 20:05:51 +0300 | [diff] [blame] | 14 | * @param request how many items to request from publisher in advance (optional, on-demand request by default). |
Roman Elizarov | 331750b | 2017-02-15 17:59:17 +0300 | [diff] [blame] | 15 | */ |
Roman Elizarov | f5f0983 | 2018-05-16 15:10:28 +0300 | [diff] [blame] | 16 | @Suppress("CONFLICTING_OVERLOADS") |
Marko Devcic | 1d6230a | 2018-04-04 20:13:08 +0200 | [diff] [blame] | 17 | public fun <T> Publisher<T>.openSubscription(request: Int = 0): ReceiveChannel<T> { |
Roman Elizarov | 3e342e3 | 2018-01-13 20:05:51 +0300 | [diff] [blame] | 18 | val channel = SubscriptionChannel<T>(request) |
Roman Elizarov | 331750b | 2017-02-15 17:59:17 +0300 | [diff] [blame] | 19 | subscribe(channel) |
| 20 | return channel |
| 21 | } |
| 22 | |
Roman Elizarov | f5f0983 | 2018-05-16 15:10:28 +0300 | [diff] [blame] | 23 | /** @suppress **Deprecated**: Left here for binary compatibility */ |
| 24 | @JvmOverloads // for binary compatibility |
| 25 | @Deprecated(level = DeprecationLevel.HIDDEN, message = "Left here for binary compatibility") |
| 26 | @Suppress("CONFLICTING_OVERLOADS") |
| 27 | public fun <T> Publisher<T>.openSubscription(request: Int = 0): SubscriptionReceiveChannel<T> = |
| 28 | openSubscription(request) as SubscriptionReceiveChannel<T> |
| 29 | |
Roman Elizarov | 331750b | 2017-02-15 17:59:17 +0300 | [diff] [blame] | 30 | /** |
Roman Elizarov | 0f66a6d | 2017-06-22 14:57:53 +0300 | [diff] [blame] | 31 | * @suppress **Deprecated**: Renamed to [openSubscription] |
| 32 | */ |
| 33 | @Deprecated(message = "Renamed to `openSubscription`", |
| 34 | replaceWith = ReplaceWith("openSubscription()")) |
Roman Elizarov | f5f0983 | 2018-05-16 15:10:28 +0300 | [diff] [blame] | 35 | public fun <T> Publisher<T>.open(): SubscriptionReceiveChannel<T> = |
| 36 | openSubscription() as SubscriptionReceiveChannel<T> |
Roman Elizarov | 0f66a6d | 2017-06-22 14:57:53 +0300 | [diff] [blame] | 37 | |
| 38 | /** |
Roman Elizarov | 331750b | 2017-02-15 17:59:17 +0300 | [diff] [blame] | 39 | * Subscribes to this [Publisher] and returns an iterator to receive elements emitted by it. |
| 40 | * |
Roman Elizarov | 0f66a6d | 2017-06-22 14:57:53 +0300 | [diff] [blame] | 41 | * This is a shortcut for `open().iterator()`. See [openSubscription] if you need an ability to manually |
Roman Elizarov | 331750b | 2017-02-15 17:59:17 +0300 | [diff] [blame] | 42 | * unsubscribe from the observable. |
| 43 | */ |
Roman Elizarov | 86349be | 2017-03-17 16:47:37 +0300 | [diff] [blame] | 44 | @Suppress("DeprecatedCallableAddReplaceWith") |
| 45 | @Deprecated(message = |
| 46 | "This iteration operator for `for (x in source) { ... }` loop is deprecated, " + |
| 47 | "because it leaves code vulnerable to leaving unclosed subscriptions on exception. " + |
| 48 | "Use `source.consumeEach { x -> ... }`.") |
Roman Elizarov | 0f66a6d | 2017-06-22 14:57:53 +0300 | [diff] [blame] | 49 | public operator fun <T> Publisher<T>.iterator() = openSubscription().iterator() |
Roman Elizarov | 331750b | 2017-02-15 17:59:17 +0300 | [diff] [blame] | 50 | |
Roman Elizarov | 86349be | 2017-03-17 16:47:37 +0300 | [diff] [blame] | 51 | /** |
| 52 | * Subscribes to this [Publisher] and performs the specified action for each received element. |
| 53 | */ |
Roman Elizarov | 3e342e3 | 2018-01-13 20:05:51 +0300 | [diff] [blame] | 54 | public suspend inline fun <T> Publisher<T>.consumeEach(action: (T) -> Unit) { |
Marko Devcic | 1d6230a | 2018-04-04 20:13:08 +0200 | [diff] [blame] | 55 | val channel = openSubscription() |
| 56 | for (x in channel) action(x) |
| 57 | channel.cancel() |
Roman Elizarov | 86349be | 2017-03-17 16:47:37 +0300 | [diff] [blame] | 58 | } |
| 59 | |
Roman Elizarov | 537c359 | 2017-08-16 19:04:31 +0300 | [diff] [blame] | 60 | /** |
| 61 | * @suppress: **Deprecated**: binary compatibility with old code |
| 62 | */ |
| 63 | @Deprecated("binary compatibility with old code", level = DeprecationLevel.HIDDEN) |
| 64 | public suspend fun <T> Publisher<T>.consumeEach(action: suspend (T) -> Unit) = |
| 65 | consumeEach { action(it) } |
| 66 | |
Roman Elizarov | 3e342e3 | 2018-01-13 20:05:51 +0300 | [diff] [blame] | 67 | private class SubscriptionChannel<T>( |
| 68 | private val request: Int |
Roman Elizarov | f5f0983 | 2018-05-16 15:10:28 +0300 | [diff] [blame] | 69 | ) : LinkedListChannel<T>(), ReceiveChannel<T>, Subscriber<T>, SubscriptionReceiveChannel<T> { |
Roman Elizarov | 3e342e3 | 2018-01-13 20:05:51 +0300 | [diff] [blame] | 70 | init { |
| 71 | require(request >= 0) { "Invalid request size: $request" } |
| 72 | } |
Roman Elizarov | 331750b | 2017-02-15 17:59:17 +0300 | [diff] [blame] | 73 | |
Roman Elizarov | 3e342e3 | 2018-01-13 20:05:51 +0300 | [diff] [blame] | 74 | @Volatile |
| 75 | private var subscription: Subscription? = null |
| 76 | |
| 77 | // requested from subscription minus number of received minus number of enqueued receivers, |
| 78 | // can be negative if we have receivers, but no subscription yet |
| 79 | private val _requested = atomic(0) |
Roman Elizarov | 331750b | 2017-02-15 17:59:17 +0300 | [diff] [blame] | 80 | |
| 81 | // AbstractChannel overrides |
Roman Elizarov | 3e342e3 | 2018-01-13 20:05:51 +0300 | [diff] [blame] | 82 | override fun onReceiveEnqueued() { |
| 83 | _requested.loop { wasRequested -> |
Roman Elizarov | 331750b | 2017-02-15 17:59:17 +0300 | [diff] [blame] | 84 | val subscription = this.subscription |
Roman Elizarov | 3e342e3 | 2018-01-13 20:05:51 +0300 | [diff] [blame] | 85 | val needRequested = wasRequested - 1 |
| 86 | if (subscription != null && needRequested < 0) { // need to request more from subscription |
| 87 | // try to fixup by making request |
| 88 | if (wasRequested != request && !_requested.compareAndSet(wasRequested, request)) |
| 89 | return@loop // continue looping if failed |
| 90 | subscription.request((request - needRequested).toLong()) |
| 91 | return |
Roman Elizarov | 331750b | 2017-02-15 17:59:17 +0300 | [diff] [blame] | 92 | } |
Roman Elizarov | 3e342e3 | 2018-01-13 20:05:51 +0300 | [diff] [blame] | 93 | // just do book-keeping |
| 94 | if (_requested.compareAndSet(wasRequested, needRequested)) return |
Roman Elizarov | 331750b | 2017-02-15 17:59:17 +0300 | [diff] [blame] | 95 | } |
| 96 | } |
| 97 | |
Roman Elizarov | 3e342e3 | 2018-01-13 20:05:51 +0300 | [diff] [blame] | 98 | override fun onReceiveDequeued() { |
| 99 | _requested.incrementAndGet() |
Roman Elizarov | 331750b | 2017-02-15 17:59:17 +0300 | [diff] [blame] | 100 | } |
| 101 | |
| 102 | override fun afterClose(cause: Throwable?) { |
| 103 | subscription?.cancel() |
| 104 | } |
| 105 | |
Roman Elizarov | 331750b | 2017-02-15 17:59:17 +0300 | [diff] [blame] | 106 | // Subscriber overrides |
| 107 | override fun onSubscribe(s: Subscription) { |
| 108 | subscription = s |
Roman Elizarov | 3e342e3 | 2018-01-13 20:05:51 +0300 | [diff] [blame] | 109 | while (true) { // lock-free loop on _requested |
Roman Elizarov | 331750b | 2017-02-15 17:59:17 +0300 | [diff] [blame] | 110 | if (isClosedForSend) { |
| 111 | s.cancel() |
| 112 | return |
| 113 | } |
Roman Elizarov | 3e342e3 | 2018-01-13 20:05:51 +0300 | [diff] [blame] | 114 | val wasRequested = _requested.value |
| 115 | if (wasRequested >= request) return // ok -- normal story |
| 116 | // otherwise, receivers came before we had subscription or need to make initial request |
Roman Elizarov | 331750b | 2017-02-15 17:59:17 +0300 | [diff] [blame] | 117 | // try to fixup by making request |
Roman Elizarov | 3e342e3 | 2018-01-13 20:05:51 +0300 | [diff] [blame] | 118 | if (!_requested.compareAndSet(wasRequested, request)) continue |
| 119 | s.request((request - wasRequested).toLong()) |
Roman Elizarov | 331750b | 2017-02-15 17:59:17 +0300 | [diff] [blame] | 120 | return |
| 121 | } |
| 122 | } |
| 123 | |
| 124 | override fun onNext(t: T) { |
Roman Elizarov | 3e342e3 | 2018-01-13 20:05:51 +0300 | [diff] [blame] | 125 | _requested.decrementAndGet() |
Roman Elizarov | 331750b | 2017-02-15 17:59:17 +0300 | [diff] [blame] | 126 | offer(t) |
| 127 | } |
| 128 | |
| 129 | override fun onComplete() { |
| 130 | close(cause = null) |
| 131 | } |
| 132 | |
| 133 | override fun onError(e: Throwable) { |
| 134 | close(cause = e) |
| 135 | } |
| 136 | } |
| 137 | |