blob: 2c093a07d921028de386813802e3fdda17767f0f [file] [log] [blame]
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kotlinx.coroutines.experimental.channels
18
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +030019import kotlinx.atomicfu.*
20import kotlinx.coroutines.experimental.internal.*
21import kotlinx.coroutines.experimental.internalAnnotations.*
22import kotlinx.coroutines.experimental.intrinsics.*
23import kotlinx.coroutines.experimental.selects.*
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +030024
25/**
Roman Elizarov0f66a6d2017-06-22 14:57:53 +030026 * Broadcasts the most recently sent element (aka [value]) to all [openSubscription] subscribers.
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +030027 *
28 * Back-to-send sent elements are _conflated_ -- only the the most recently sent value is received,
29 * while previously sent elements **are lost**.
Roman Elizarov8046fe12017-04-27 12:52:13 +030030 * Every subscriber immediately receives the most recently sent element.
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +030031 * Sender to this broadcast channel never suspends and [offer] always returns `true`.
32 *
33 * A secondary constructor can be used to create an instance of this class that already holds a value.
Roman Elizarov36387472017-07-21 18:37:49 +030034 * This channel is also created by `BroadcastChannel(Channel.CONFLATED)` factory function invocation.
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +030035 *
36 * This implementation is fully lock-free. In this implementation
Marko Devcic1d6230a2018-04-04 20:13:08 +020037 * [opening][openSubscription] and [closing][ReceiveChannel.cancel] subscription takes O(N) time, where N is the
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +030038 * number of subscribers.
39 */
Roman Elizarov8046fe12017-04-27 12:52:13 +030040public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +030041 /**
42 * Creates an instance of this class that already holds a value.
43 *
44 * It is as a shortcut to creating an instance with a default constructor and
Roman Elizarov8046fe12017-04-27 12:52:13 +030045 * immediately sending an element: `ConflatedBroadcastChannel().apply { offer(value) }`.
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +030046 */
47 constructor(value: E) : this() {
Roman Elizarov7753f8e2017-08-15 11:16:33 +030048 _state.lazySet(State<E>(value, null))
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +030049 }
50
Roman Elizarov7753f8e2017-08-15 11:16:33 +030051 private val _state = atomic<Any>(INITIAL_STATE) // State | Closed
52 private val _updating = atomic(0)
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +030053
54 private companion object {
55 @JvmField
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +030056 val CLOSED = Closed(null)
57
58 @JvmField
59 val UNDEFINED = Symbol("UNDEFINED")
60
61 @JvmField
62 val INITIAL_STATE = State<Any?>(UNDEFINED, null)
63 }
64
65 private class State<E>(
66 @JvmField val value: Any?, // UNDEFINED | E
67 @JvmField val subscribers: Array<Subscriber<E>>?
68 )
69
70 private class Closed(@JvmField val closeCause: Throwable?) {
71 val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
72 val valueException: Throwable get() = closeCause ?: IllegalStateException(DEFAULT_CLOSE_MESSAGE)
73 }
74
75 /**
76 * The most recently sent element to this channel.
77 *
78 * Access to this property throws [IllegalStateException] when this class is constructed without
Roman Elizarov95562422017-09-27 17:41:01 +030079 * initial value and no value was sent yet or if it was [closed][close] without a cause.
80 * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +030081 */
82 @Suppress("UNCHECKED_CAST")
83 public val value: E get() {
Roman Elizarov7753f8e2017-08-15 11:16:33 +030084 _state.loop { state ->
85 when (state) {
86 is Closed -> throw state.valueException
87 is State<*> -> {
88 if (state.value === UNDEFINED) throw IllegalStateException("No value")
89 return state.value as E
90 }
91 else -> error("Invalid state $state")
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +030092 }
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +030093 }
94 }
95
96 /**
97 * The most recently sent element to this channel or `null` when this class is constructed without
98 * initial value and no value was sent yet or if it was [closed][close].
99 */
100 @Suppress("UNCHECKED_CAST")
101 public val valueOrNull: E? get() {
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300102 val state = _state.value
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +0300103 when (state) {
104 is Closed -> return null
105 is State<*> -> {
106 if (state.value === UNDEFINED) return null
107 return state.value as E
108 }
109 else -> error("Invalid state $state")
110 }
111 }
112
Roman Elizarov89f8ff72018-03-14 13:39:03 +0300113 public override val isClosedForSend: Boolean get() = _state.value is Closed
114 public override val isFull: Boolean get() = false
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +0300115
Roman Elizarovc8af35c2018-05-16 20:23:37 +0300116 @Suppress("UNCHECKED_CAST")
Roman Elizarov89f8ff72018-03-14 13:39:03 +0300117 public override fun openSubscription(): ReceiveChannel<E> {
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +0300118 val subscriber = Subscriber<E>(this)
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300119 _state.loop { state ->
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +0300120 when (state) {
121 is Closed -> {
122 subscriber.close(state.closeCause)
123 return subscriber
124 }
125 is State<*> -> {
126 if (state.value !== UNDEFINED)
127 subscriber.offerInternal(state.value as E)
128 val update = State(state.value, addSubscriber((state as State<E>).subscribers, subscriber))
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300129 if (_state.compareAndSet(state, update))
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +0300130 return subscriber
131 }
132 else -> error("Invalid state $state")
133 }
134 }
135 }
136
137 @Suppress("UNCHECKED_CAST")
138 private fun closeSubscriber(subscriber: Subscriber<E>) {
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300139 _state.loop { state ->
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +0300140 when (state) {
141 is Closed -> return
142 is State<*> -> {
143 val update = State(state.value, removeSubscriber((state as State<E>).subscribers!!, subscriber))
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300144 if (_state.compareAndSet(state, update))
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +0300145 return
146 }
147 else -> error("Invalid state $state")
148 }
149 }
150 }
151
152 private fun addSubscriber(list: Array<Subscriber<E>>?, subscriber: Subscriber<E>): Array<Subscriber<E>> {
Roman Elizarov89f8ff72018-03-14 13:39:03 +0300153 if (list == null) return Array(1) { subscriber }
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +0300154 return list + subscriber
155 }
156
157 @Suppress("UNCHECKED_CAST")
158 private fun removeSubscriber(list: Array<Subscriber<E>>, subscriber: Subscriber<E>): Array<Subscriber<E>>? {
159 val n = list.size
160 val i = list.indexOf(subscriber)
161 check(i >= 0)
162 if (n == 1) return null
163 val update = arrayOfNulls<Subscriber<E>>(n - 1)
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +0300164 arraycopy(list, 0, update, 0, i)
165 arraycopy(list, i + 1, update, i, n - i - 1)
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +0300166 return update as Array<Subscriber<E>>
167 }
168
169 @Suppress("UNCHECKED_CAST")
Roman Elizarov89f8ff72018-03-14 13:39:03 +0300170 public override fun close(cause: Throwable?): Boolean {
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300171 _state.loop { state ->
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +0300172 when (state) {
173 is Closed -> return false
174 is State<*> -> {
175 val update = if (cause == null) CLOSED else Closed(cause)
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300176 if (_state.compareAndSet(state, update)) {
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +0300177 (state as State<E>).subscribers?.forEach { it.close(cause) }
178 return true
179 }
180 }
181 else -> error("Invalid state $state")
182 }
183 }
184 }
185
186 /**
Roman Elizarov89f8ff72018-03-14 13:39:03 +0300187 * Closes this broadcast channel. Same as [close].
188 */
189 public override fun cancel(cause: Throwable?): Boolean = close(cause)
190
191 /**
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +0300192 * Sends the value to all subscribed receives and stores this value as the most recent state for
193 * future subscribers. This implementation never suspends.
Roman Elizarov95562422017-09-27 17:41:01 +0300194 * It throws exception if the channel [isClosedForSend] (see [close] for details).
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +0300195 */
Roman Elizarov89f8ff72018-03-14 13:39:03 +0300196 public override suspend fun send(element: E) {
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +0300197 offerInternal(element)?.let { throw it.sendException }
198 }
199
200 /**
201 * Sends the value to all subscribed receives and stores this value as the most recent state for
202 * future subscribers. This implementation always returns `true`.
Roman Elizarov95562422017-09-27 17:41:01 +0300203 * It throws exception if the channel [isClosedForSend] (see [close] for details).
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +0300204 */
Roman Elizarov89f8ff72018-03-14 13:39:03 +0300205 public override fun offer(element: E): Boolean {
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +0300206 offerInternal(element)?.let { throw it.sendException }
207 return true
208 }
209
210 @Suppress("UNCHECKED_CAST")
211 private fun offerInternal(element: E): Closed? {
212 // If some other thread is updating the state in its offer operation we assume that our offer had linearized
213 // before that offer (we lost) and that offer overwrote us and conflated our offer.
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300214 if (!_updating.compareAndSet(0, 1)) return null
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +0300215 try {
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300216 _state.loop { state ->
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +0300217 when (state) {
218 is Closed -> return state
219 is State<*> -> {
220 val update = State(element, (state as State<E>).subscribers)
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300221 if (_state.compareAndSet(state, update)) {
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +0300222 // Note: Using offerInternal here to ignore the case when this subscriber was
223 // already concurrently closed (assume the close had conflated our offer for this
224 // particular subscriber).
225 state.subscribers?.forEach { it.offerInternal(element) }
226 return null
227 }
228 }
229 else -> error("Invalid state $state")
230 }
231 }
232 } finally {
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300233 _updating.value = 0 // reset the updating flag to zero even when something goes wrong
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +0300234 }
235 }
236
Roman Elizarov89f8ff72018-03-14 13:39:03 +0300237 public override val onSend: SelectClause2<E, SendChannel<E>>
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300238 get() = object : SelectClause2<E, SendChannel<E>> {
239 override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
240 registerSelectSend(select, param, block)
241 }
242 }
243
244 private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
Roman Elizarov932e8602017-06-21 17:21:37 +0300245 if (!select.trySelect(null)) return
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +0300246 offerInternal(element)?.let {
Roman Elizarov932e8602017-06-21 17:21:37 +0300247 select.resumeSelectCancellableWithException(it.sendException)
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +0300248 return
249 }
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300250 block.startCoroutineUndispatched(receiver = this, completion = select.completion)
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +0300251 }
252
253 private class Subscriber<E>(
Roman Elizarov8046fe12017-04-27 12:52:13 +0300254 private val broadcastChannel: ConflatedBroadcastChannel<E>
Roman Elizarovf5f09832018-05-16 15:10:28 +0300255 ) : ConflatedChannel<E>(), ReceiveChannel<E>, SubscriptionReceiveChannel<E> {
Roman Elizarovb555d912017-08-17 21:01:33 +0300256 override fun cancel(cause: Throwable?): Boolean =
257 close(cause).also { closed ->
258 if (closed) broadcastChannel.closeSubscriber(this)
259 }
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +0300260
261 public override fun offerInternal(element: E): Any = super.offerInternal(element)
262 }
263}