blob: 7d79bed4b1e017df3e15cb369f3ba7d4b89ea289 [file] [log] [blame]
Roman Elizarove3aa8ff2017-04-27 19:16:40 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarove3aa8ff2017-04-27 19:16:40 +03003 */
4
5package kotlinx.coroutines.experimental.channels
6
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +03007import kotlinx.coroutines.experimental.internal.*
8import kotlinx.coroutines.experimental.internalAnnotations.*
Roman Elizarove4b6f092018-03-06 13:37:42 +03009import kotlinx.coroutines.experimental.selects.*
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030010
11/**
12 * Broadcast channel with array buffer of a fixed [capacity].
Roman Elizarov921b0cf2017-06-22 14:36:55 +030013 * Sender suspends only when buffer is full due to one of the receives being slow to consume and
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030014 * receiver suspends only when buffer is empty.
15 *
Roman Elizarov89f8ff72018-03-14 13:39:03 +030016 * **Note**, that elements that are sent to this channel while there are no
17 * [openSubscription] subscribers are immediately lost.
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030018 *
Roman Elizarov36387472017-07-21 18:37:49 +030019 * This channel is created by `BroadcastChannel(capacity)` factory function invocation.
20 *
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030021 * This implementation uses lock to protect the buffer, which is held only during very short buffer-update operations.
22 * The lock at each subscription is also used to manage concurrent attempts to receive from the same subscriber.
23 * The lists of suspended senders or receivers are lock-free.
24 */
25class ArrayBroadcastChannel<E>(
26 /**
27 * Buffer capacity.
28 */
29 val capacity: Int
30) : AbstractSendChannel<E>(), BroadcastChannel<E> {
31 init {
Roman Elizarov8385ec92017-05-11 18:32:52 +030032 require(capacity >= 1) { "ArrayBroadcastChannel capacity must be at least 1, but $capacity was specified" }
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030033 }
34
Vsevolod Tolstopyatov167ca632018-06-29 11:49:00 +030035 /*
36 * Writes to buffer are guarded by bufferLock, but reads from buffer are concurrent with writes
37 * - Write element to buffer then write "tail" (volatile)
38 * - Read "tail" (volatile), then read element from buffer
39 * So read/writes to buffer need not be volatile
40 */
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030041 private val bufferLock = ReentrantLock()
Vsevolod Tolstopyatov167ca632018-06-29 11:49:00 +030042 private val buffer = arrayOfNulls<Any?>(capacity)
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030043
44 // head & tail are Long (64 bits) and we assume that they never wrap around
45 // head, tail, and size are guarded by bufferLock
46 @Volatile
47 private var head: Long = 0 // do modulo on use of head
48 @Volatile
49 private var tail: Long = 0 // do modulo on use of tail
50 @Volatile
51 private var size: Int = 0
52
Vsevolod Tolstopyatov167ca632018-06-29 11:49:00 +030053 private val subscribers = subscriberList<Subscriber<E>>()
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030054
55 override val isBufferAlwaysFull: Boolean get() = false
56 override val isBufferFull: Boolean get() = size >= capacity
57
Roman Elizarov89f8ff72018-03-14 13:39:03 +030058 public override fun openSubscription(): ReceiveChannel<E> =
Roman Elizarov9525f3c2017-10-27 12:11:55 +030059 Subscriber(this).also {
60 updateHead(addSub = it)
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030061 }
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030062
Roman Elizarov89f8ff72018-03-14 13:39:03 +030063 public override fun close(cause: Throwable?): Boolean {
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030064 if (!super.close(cause)) return false
65 checkSubOffers()
66 return true
67 }
68
Roman Elizarov89f8ff72018-03-14 13:39:03 +030069 public override fun cancel(cause: Throwable?): Boolean =
70 close(cause).also {
Vsevolod Tolstopyatov167ca632018-06-29 11:49:00 +030071 for (sub in subscribers) sub.cancel(cause)
Roman Elizarov89f8ff72018-03-14 13:39:03 +030072 }
73
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030074 // result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
75 override fun offerInternal(element: E): Any {
76 bufferLock.withLock {
77 // check if closed for send (under lock, so size cannot change)
78 closedForSend?.let { return it }
79 val size = this.size
80 if (size >= capacity) return OFFER_FAILED
81 val tail = this.tail
82 buffer[(tail % capacity).toInt()] = element
83 this.size = size + 1
84 this.tail = tail + 1
85 }
Vsevolod Tolstopyatov167ca632018-06-29 11:49:00 +030086 // if offered successfully, then check subscribers outside of lock
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030087 checkSubOffers()
88 return OFFER_SUCCESS
89 }
90
91 // result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`
92 override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
93 bufferLock.withLock {
94 // check if closed for send (under lock, so size cannot change)
95 closedForSend?.let { return it }
96 val size = this.size
97 if (size >= capacity) return OFFER_FAILED
98 // let's try to select sending this element to buffer
99 if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
100 return ALREADY_SELECTED
101 }
102 val tail = this.tail
103 buffer[(tail % capacity).toInt()] = element
104 this.size = size + 1
105 this.tail = tail + 1
106 }
Vsevolod Tolstopyatov167ca632018-06-29 11:49:00 +0300107 // if offered successfully, then check subscribers outside of lock
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300108 checkSubOffers()
109 return OFFER_SUCCESS
110 }
111
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300112 private fun checkSubOffers() {
113 var updated = false
Roman Elizarov921b0cf2017-06-22 14:36:55 +0300114 var hasSubs = false
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300115 @Suppress("LoopToCallChain") // must invoke `checkOffer` on every sub
Vsevolod Tolstopyatov167ca632018-06-29 11:49:00 +0300116 for (sub in subscribers) {
Roman Elizarov921b0cf2017-06-22 14:36:55 +0300117 hasSubs = true
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300118 if (sub.checkOffer()) updated = true
119 }
Roman Elizarov921b0cf2017-06-22 14:36:55 +0300120 if (updated || !hasSubs)
Roman Elizarov9525f3c2017-10-27 12:11:55 +0300121 updateHead()
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300122 }
123
Roman Elizarov9525f3c2017-10-27 12:11:55 +0300124 // updates head if needed and optionally adds / removes subscriber under the same lock
125 private tailrec fun updateHead(addSub: Subscriber<E>? = null, removeSub: Subscriber<E>? = null) {
Roman Elizarov43918972017-10-07 21:00:06 +0300126 // update head in a tail rec loop
127 var send: Send? = null
128 var token: Any? = null
129 bufferLock.withLock {
Roman Elizarov9525f3c2017-10-27 12:11:55 +0300130 if (addSub != null) {
131 addSub.subHead = tail // start from last element
Vsevolod Tolstopyatov167ca632018-06-29 11:49:00 +0300132 val wasEmpty = subscribers.isEmpty()
133 subscribers.add(addSub)
Roman Elizarov9525f3c2017-10-27 12:11:55 +0300134 if (!wasEmpty) return // no need to update when adding second and etc sub
135 }
Roman Elizarov43918972017-10-07 21:00:06 +0300136 if (removeSub != null) {
Vsevolod Tolstopyatov167ca632018-06-29 11:49:00 +0300137 subscribers.remove(removeSub)
Roman Elizarov43918972017-10-07 21:00:06 +0300138 if (head != removeSub.subHead) return // no need to update
139 }
140 val minHead = computeMinHead()
141 val tail = this.tail
142 var head = this.head
143 val targetHead = minHead.coerceAtMost(tail)
144 if (targetHead <= head) return // nothing to do -- head was already moved
145 var size = this.size
146 // clean up removed (on not need if we don't have any subscribers anymore)
147 while (head < targetHead) {
148 buffer[(head % capacity).toInt()] = null
149 val wasFull = size >= capacity
150 // update the size before checking queue (no more senders can queue up)
151 this.head = ++head
152 this.size = --size
153 if (wasFull) {
154 while (true) {
155 send = takeFirstSendOrPeekClosed() ?: break // when when no sender
156 if (send is Closed<*>) break // break when closed for send
157 token = send!!.tryResumeSend(idempotent = null)
158 if (token != null) {
159 // put sent element to the buffer
160 buffer[(tail % capacity).toInt()] = (send as Send).pollResult
161 this.size = size + 1
162 this.tail = tail + 1
163 return@withLock // go out of lock to wakeup this sender
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300164 }
165 }
166 }
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300167 }
Roman Elizarov43918972017-10-07 21:00:06 +0300168 return // done updating here -> return
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300169 }
Roman Elizarov43918972017-10-07 21:00:06 +0300170 // we only get out of the lock normally when there is a sender to resume
171 send!!.completeResumeSend(token!!)
172 // since we've just sent an element, we might need to resume some receivers
173 checkSubOffers()
174 // tailrec call to recheck
Roman Elizarov9525f3c2017-10-27 12:11:55 +0300175 updateHead()
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300176 }
177
178 private fun computeMinHead(): Long {
179 var minHead = Long.MAX_VALUE
Vsevolod Tolstopyatov167ca632018-06-29 11:49:00 +0300180 for (sub in subscribers)
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300181 minHead = minHead.coerceAtMost(sub.subHead) // volatile (atomic) reads of subHead
182 return minHead
183 }
184
185 @Suppress("UNCHECKED_CAST")
186 private fun elementAt(index: Long): E = buffer[(index % capacity).toInt()] as E
187
188 private class Subscriber<E>(
Roman Elizarov9525f3c2017-10-27 12:11:55 +0300189 private val broadcastChannel: ArrayBroadcastChannel<E>
Roman Elizarovf5f09832018-05-16 15:10:28 +0300190 ) : AbstractChannel<E>(), ReceiveChannel<E>, SubscriptionReceiveChannel<E> {
Roman Elizarov43918972017-10-07 21:00:06 +0300191 private val subLock = ReentrantLock()
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300192
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +0300193 @Volatile
194 @JvmField
Roman Elizarov9525f3c2017-10-27 12:11:55 +0300195 var subHead: Long = 0 // guarded by subLock
196
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300197 override val isBufferAlwaysEmpty: Boolean get() = false
198 override val isBufferEmpty: Boolean get() = subHead >= broadcastChannel.tail
199 override val isBufferAlwaysFull: Boolean get() = error("Should not be used")
200 override val isBufferFull: Boolean get() = error("Should not be used")
201
Roman Elizarovb555d912017-08-17 21:01:33 +0300202 override fun cancel(cause: Throwable?): Boolean =
203 close(cause).also { closed ->
204 if (closed) broadcastChannel.updateHead(removeSub = this)
Roman Elizarov89f8ff72018-03-14 13:39:03 +0300205 clearBuffer()
Roman Elizarovb555d912017-08-17 21:01:33 +0300206 }
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300207
Roman Elizarov89f8ff72018-03-14 13:39:03 +0300208 private fun clearBuffer() {
209 subLock.withLock {
210 subHead = broadcastChannel.tail
211 }
212 }
213
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300214 // returns true if subHead was updated and broadcast channel's head must be checked
215 // this method is lock-free (it never waits on lock)
216 @Suppress("UNCHECKED_CAST")
217 fun checkOffer(): Boolean {
218 var updated = false
219 var closed: Closed<*>? = null
220 loop@
221 while (needsToCheckOfferWithoutLock()) {
222 // just use `tryLock` here and break when some other thread is checking under lock
223 // it means that `checkOffer` must be retried after every `unlock`
Roman Elizarov43918972017-10-07 21:00:06 +0300224 if (!subLock.tryLock()) break
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300225 val receive: ReceiveOrClosed<E>?
226 val token: Any?
227 try {
228 val result = peekUnderLock()
229 when {
230 result === POLL_FAILED -> continue@loop // must retest `needsToCheckOfferWithoutLock` outside of the lock
231 result is Closed<*> -> {
232 closed = result
233 break@loop // was closed
234 }
235 }
236 // find a receiver for an element
237 receive = takeFirstReceiveOrPeekClosed() ?: break // break when no one's receiving
238 if (receive is Closed<*>) break // noting more to do if this sub already closed
239 token = receive.tryResumeReceive(result as E, idempotent = null)
240 if (token == null) continue // bail out here to next iteration (see for next receiver)
241 val subHead = this.subHead
242 this.subHead = subHead + 1 // retrieved element for this subscriber
243 updated = true
244 } finally {
Roman Elizarov43918972017-10-07 21:00:06 +0300245 subLock.unlock()
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300246 }
247 receive!!.completeResumeReceive(token!!)
248 }
249 // do close outside of lock if needed
250 closed?.also { close(cause = it.closeCause) }
251 return updated
252 }
253
254 // result is `E | POLL_FAILED | Closed`
255 override fun pollInternal(): Any? {
256 var updated = false
Roman Elizarov43918972017-10-07 21:00:06 +0300257 val result = subLock.withLock {
258 val result = peekUnderLock()
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300259 when {
260 result is Closed<*> -> { /* just bail out of lock */ }
261 result === POLL_FAILED -> { /* just bail out of lock */ }
262 else -> {
263 // update subHead after retrieiving element from buffer
264 val subHead = this.subHead
265 this.subHead = subHead + 1
266 updated = true
267 }
268 }
Roman Elizarov43918972017-10-07 21:00:06 +0300269 result
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300270 }
271 // do close outside of lock
272 (result as? Closed<*>)?.also { close(cause = it.closeCause) }
273 // there could have been checkOffer attempt while we were holding lock
274 // now outside the lock recheck if anything else to offer
275 if (checkOffer())
276 updated = true
277 // and finally update broadcast's channel head if needed
278 if (updated)
Roman Elizarov9525f3c2017-10-27 12:11:55 +0300279 broadcastChannel.updateHead()
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300280 return result
281 }
282
283 // result is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
284 override fun pollSelectInternal(select: SelectInstance<*>): Any? {
285 var updated = false
Roman Elizarov43918972017-10-07 21:00:06 +0300286 val result = subLock.withLock {
287 var result = peekUnderLock()
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300288 when {
289 result is Closed<*> -> { /* just bail out of lock */ }
290 result === POLL_FAILED -> { /* just bail out of lock */ }
291 else -> {
292 // let's try to select receiving this element from buffer
293 if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
294 result = ALREADY_SELECTED
295 } else {
296 // update subHead after retrieiving element from buffer
297 val subHead = this.subHead
298 this.subHead = subHead + 1
299 updated = true
300 }
301 }
302 }
Roman Elizarov43918972017-10-07 21:00:06 +0300303 result
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300304 }
305 // do close outside of lock
306 (result as? Closed<*>)?.also { close(cause = it.closeCause) }
307 // there could have been checkOffer attempt while we were holding lock
308 // now outside the lock recheck if anything else to offer
309 if (checkOffer())
310 updated = true
311 // and finally update broadcast's channel head if needed
312 if (updated)
Roman Elizarov9525f3c2017-10-27 12:11:55 +0300313 broadcastChannel.updateHead()
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300314 return result
315 }
316
317 // Must invoke this check this after lock, because offer's invocation of `checkOffer` might have failed
318 // to `tryLock` just before the lock was about to unlocked, thus loosing notification to this
319 // subscription about an element that was just offered
320 private fun needsToCheckOfferWithoutLock(): Boolean {
321 if (closedForReceive != null)
322 return false // already closed -> nothing to do
323 if (isBufferEmpty && broadcastChannel.closedForReceive == null)
324 return false // no data for us && broadcast channel was not closed yet -> nothing to do
325 return true // check otherwise
326 }
327
328 // guarded by lock, returns:
329 // E - the element from the buffer at subHead
330 // Closed<*> when closed;
331 // POLL_FAILED when there seems to be no data, but must retest `needsToCheckOfferWithoutLock` outside of lock
332 private fun peekUnderLock(): Any? {
333 val subHead = this.subHead // guarded read (can be non-volatile read)
334 // note: from the broadcastChannel we must read closed token first, then read its tail
335 // because it is Ok if tail moves in between the reads (we make decision based on tail first)
Roman Elizarovc2adef52017-08-17 16:13:41 +0300336 val closedBroadcast = broadcastChannel.closedForReceive // unguarded volatile read
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300337 val tail = broadcastChannel.tail // unguarded volatile read
338 if (subHead >= tail) {
Roman Elizarovb2b5c062018-02-07 12:55:44 +0300339 // no elements to poll from the queue -- check if closed broads & closed this sub
340 // must retest `needsToCheckOfferWithoutLock` outside of the lock
341 return closedBroadcast ?: this.closedForReceive ?: POLL_FAILED
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300342 }
Roman Elizarovc2adef52017-08-17 16:13:41 +0300343 // Get tentative result. This result may be wrong (completely invalid value, including null),
344 // because this subscription might get closed, moving channel's head past this subscription's head.
345 val result = broadcastChannel.elementAt(subHead)
346 // now check if this subscription was closed
347 val closedSub = this.closedForReceive
348 if (closedSub != null) return closedSub
349 // we know the subscription was not closed, so this tentative result is Ok to return
350 return result
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300351 }
352 }
Roman Elizarove4b6f092018-03-06 13:37:42 +0300353
354 // ------ debug ------
355
356 override val bufferDebugString: String
357 get() = "(buffer:capacity=${buffer.size},size=$size)"
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300358}