blob: 1e1c0d3ae4c4aeae519aa35c87cb1c624f70fab9 [file] [log] [blame]
Roman Elizarovf16fd272017-02-07 11:26:00 +03001/*
Roman Elizarov583d39d2019-07-02 16:21:22 +03002 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarovf16fd272017-02-07 11:26:00 +03003 */
4
Roman Elizarov0950dfa2018-07-13 10:33:25 +03005package kotlinx.coroutines.channels
Roman Elizarov7b2d8b02017-02-02 20:09:14 +03006
Roman Elizarov583d39d2019-07-02 16:21:22 +03007import kotlinx.coroutines.*
Roman Elizarov0950dfa2018-07-13 10:33:25 +03008import kotlinx.coroutines.internal.*
9import kotlinx.coroutines.selects.*
Vsevolod Tolstopyatov36c3ba12018-10-14 23:45:28 +030010import kotlin.jvm.*
Vsevolod Tolstopyatovc7e9b562019-08-09 17:33:55 +030011import kotlin.math.*
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030012
13/**
14 * Channel with array buffer of a fixed [capacity].
Vsevolod Tolstopyatov37f1b972019-01-16 18:09:36 +030015 * Sender suspends only when buffer is full and receiver suspends only when buffer is empty.
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030016 *
Roman Elizarovf2a710a2017-07-21 18:33:59 +030017 * This channel is created by `Channel(capacity)` factory function invocation.
18 *
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030019 * This implementation uses lock to protect the buffer, which is held only during very short buffer-update operations.
20 * The lists of suspended senders or receivers are lock-free.
Vsevolod Tolstopyatov1f7b2d82018-10-09 15:57:51 +030021 **/
22internal open class ArrayChannel<E>(
Roman Elizarovf138bbc2017-02-09 19:13:08 +030023 /**
24 * Buffer capacity.
25 */
26 val capacity: Int
27) : AbstractChannel<E>() {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030028 init {
Roman Elizarov8385ec92017-05-11 18:32:52 +030029 require(capacity >= 1) { "ArrayChannel capacity must be at least 1, but $capacity was specified" }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030030 }
31
32 private val lock = ReentrantLock()
Vsevolod Tolstopyatovc7e9b562019-08-09 17:33:55 +030033 /*
34 * Guarded by lock.
35 * Allocate minimum of capacity and 16 to avoid excess memory pressure for large channels when it's not necessary.
36 */
37 private var buffer: Array<Any?> = arrayOfNulls<Any?>(min(capacity, 8))
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030038 private var head: Int = 0
39 @Volatile
Vsevolod Tolstopyatovc7e9b562019-08-09 17:33:55 +030040 private var size: Int = 0 // Invariant: size <= capacity
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030041
Roman Elizarov2ad0e942017-02-28 19:14:08 +030042 protected final override val isBufferAlwaysEmpty: Boolean get() = false
43 protected final override val isBufferEmpty: Boolean get() = size == 0
44 protected final override val isBufferAlwaysFull: Boolean get() = false
45 protected final override val isBufferFull: Boolean get() = size == capacity
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030046
Roman Elizarovf6fed2a2017-02-03 19:12:09 +030047 // result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
Roman Elizarov2ad0e942017-02-28 19:14:08 +030048 protected override fun offerInternal(element: E): Any {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030049 var receive: ReceiveOrClosed<E>? = null
Roman Elizarov1216e912017-02-22 09:57:06 +030050 var token: Any? = null
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030051 lock.withLock {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030052 val size = this.size
Roman Elizarovf6fed2a2017-02-03 19:12:09 +030053 closedForSend?.let { return it }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030054 if (size < capacity) {
55 // tentatively put element to buffer
56 this.size = size + 1 // update size before checking queue (!!!)
57 // check for receivers that were waiting on empty queue
58 if (size == 0) {
Roman Elizarov1216e912017-02-22 09:57:06 +030059 loop@ while (true) {
60 receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
61 if (receive is Closed) {
62 this.size = size // restore size
63 return receive!!
64 }
65 token = receive!!.tryResumeReceive(element, idempotent = null)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030066 if (token != null) {
67 this.size = size // restore size
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030068 return@withLock
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030069 }
70 }
71 }
Vsevolod Tolstopyatovc7e9b562019-08-09 17:33:55 +030072 ensureCapacity(size)
73 buffer[(head + size) % buffer.size] = element // actually queue element
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030074 return OFFER_SUCCESS
75 }
76 // size == capacity: full
77 return OFFER_FAILED
78 }
79 // breaks here if offer meets receiver
80 receive!!.completeResumeReceive(token!!)
81 return receive!!.offerResult
82 }
83
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030084 // result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`
Roman Elizarov2ad0e942017-02-28 19:14:08 +030085 protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
Roman Elizarov1216e912017-02-22 09:57:06 +030086 var receive: ReceiveOrClosed<E>? = null
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030087 var token: Any? = null
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030088 lock.withLock {
Roman Elizarov1216e912017-02-22 09:57:06 +030089 val size = this.size
90 closedForSend?.let { return it }
91 if (size < capacity) {
92 // tentatively put element to buffer
93 this.size = size + 1 // update size before checking queue (!!!)
94 // check for receivers that were waiting on empty queue
95 if (size == 0) {
96 loop@ while (true) {
97 val offerOp = describeTryOffer(element)
98 val failure = select.performAtomicTrySelect(offerOp)
99 when {
100 failure == null -> { // offered successfully
101 this.size = size // restore size
102 receive = offerOp.result
103 token = offerOp.resumeToken
Roman Elizarov583d39d2019-07-02 16:21:22 +0300104 assert { token != null }
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300105 return@withLock
Roman Elizarov1216e912017-02-22 09:57:06 +0300106 }
107 failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
108 failure === ALREADY_SELECTED || failure is Closed<*> -> {
109 this.size = size // restore size
110 return failure
111 }
112 else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
113 }
114 }
115 }
116 // let's try to select sending this element to buffer
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300117 if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
Roman Elizarov1216e912017-02-22 09:57:06 +0300118 this.size = size // restore size
119 return ALREADY_SELECTED
120 }
Vsevolod Tolstopyatovc7e9b562019-08-09 17:33:55 +0300121 ensureCapacity(size)
122 buffer[(head + size) % buffer.size] = element // actually queue element
Roman Elizarov1216e912017-02-22 09:57:06 +0300123 return OFFER_SUCCESS
124 }
125 // size == capacity: full
126 return OFFER_FAILED
127 }
128 // breaks here if offer meets receiver
129 receive!!.completeResumeReceive(token!!)
130 return receive!!.offerResult
131 }
132
Vsevolod Tolstopyatovc7e9b562019-08-09 17:33:55 +0300133 // Guarded by lock
134 private fun ensureCapacity(currentSize: Int) {
135 if (currentSize >= buffer.size) {
136 val newSize = min(buffer.size * 2, capacity)
137 val newBuffer = arrayOfNulls<Any?>(newSize)
138 for (i in 0 until currentSize) {
139 newBuffer[i] = buffer[(head + i) % buffer.size]
140 }
141 buffer = newBuffer
142 head = 0
143 }
144 }
145
Roman Elizarov1216e912017-02-22 09:57:06 +0300146 // result is `E | POLL_FAILED | Closed`
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300147 protected override fun pollInternal(): Any? {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300148 var send: Send? = null
Roman Elizarov1216e912017-02-22 09:57:06 +0300149 var token: Any? = null
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300150 var result: Any? = null
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300151 lock.withLock {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300152 val size = this.size
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300153 if (size == 0) return closedForSend ?: POLL_FAILED // when nothing can be read from buffer
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300154 // size > 0: not empty -- retrieve element
155 result = buffer[head]
156 buffer[head] = null
157 this.size = size - 1 // update size before checking queue (!!!)
158 // check for senders that were waiting on full queue
Roman Elizarov1216e912017-02-22 09:57:06 +0300159 var replacement: Any? = POLL_FAILED
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300160 if (size == capacity) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300161 loop@ while (true) {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300162 send = takeFirstSendOrPeekClosed() ?: break
Roman Elizarov1216e912017-02-22 09:57:06 +0300163 token = send!!.tryResumeSend(idempotent = null)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300164 if (token != null) {
165 replacement = send!!.pollResult
Roman Elizarov1216e912017-02-22 09:57:06 +0300166 break@loop
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300167 }
168 }
169 }
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300170 if (replacement !== POLL_FAILED && replacement !is Closed<*>) {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300171 this.size = size // restore size
Vsevolod Tolstopyatovc7e9b562019-08-09 17:33:55 +0300172 buffer[(head + size) % buffer.size] = replacement
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300173 }
Vsevolod Tolstopyatovc7e9b562019-08-09 17:33:55 +0300174 head = (head + 1) % buffer.size
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300175 }
176 // complete send the we're taken replacement from
177 if (token != null)
178 send!!.completeResumeSend(token!!)
179 return result
180 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300181
182 // result is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300183 protected override fun pollSelectInternal(select: SelectInstance<*>): Any? {
Roman Elizarov1216e912017-02-22 09:57:06 +0300184 var send: Send? = null
185 var token: Any? = null
186 var result: Any? = null
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300187 lock.withLock {
Roman Elizarov1216e912017-02-22 09:57:06 +0300188 val size = this.size
189 if (size == 0) return closedForSend ?: POLL_FAILED
190 // size > 0: not empty -- retrieve element
191 result = buffer[head]
192 buffer[head] = null
193 this.size = size - 1 // update size before checking queue (!!!)
194 // check for senders that were waiting on full queue
195 var replacement: Any? = POLL_FAILED
196 if (size == capacity) {
197 loop@ while (true) {
198 val pollOp = describeTryPoll()
199 val failure = select.performAtomicTrySelect(pollOp)
200 when {
201 failure == null -> { // polled successfully
202 send = pollOp.result
203 token = pollOp.resumeToken
Roman Elizarov583d39d2019-07-02 16:21:22 +0300204 assert { token != null }
Roman Elizarov1216e912017-02-22 09:57:06 +0300205 replacement = send!!.pollResult
206 break@loop
207 }
208 failure === POLL_FAILED -> break@loop // cannot poll -> Ok to take from buffer
209 failure === ALREADY_SELECTED -> {
210 this.size = size // restore size
211 buffer[head] = result // restore head
212 return failure
213 }
214 failure is Closed<*> -> {
215 send = failure
216 token = failure.tryResumeSend(idempotent = null)
217 replacement = failure
218 break@loop
219 }
220 else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
221 }
222 }
223 }
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300224 if (replacement !== POLL_FAILED && replacement !is Closed<*>) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300225 this.size = size // restore size
Vsevolod Tolstopyatovc7e9b562019-08-09 17:33:55 +0300226 buffer[(head + size) % buffer.size] = replacement
Roman Elizarov1216e912017-02-22 09:57:06 +0300227 } else {
228 // failed to poll or is already closed --> let's try to select receiving this element from buffer
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300229 if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
Roman Elizarov1216e912017-02-22 09:57:06 +0300230 this.size = size // restore size
231 buffer[head] = result // restore head
232 return ALREADY_SELECTED
233 }
234 }
Vsevolod Tolstopyatovc7e9b562019-08-09 17:33:55 +0300235 head = (head + 1) % buffer.size
Roman Elizarov1216e912017-02-22 09:57:06 +0300236 }
237 // complete send the we're taken replacement from
238 if (token != null)
239 send!!.completeResumeSend(token!!)
240 return result
241 }
Roman Elizarovb555d912017-08-17 21:01:33 +0300242
243 // Note: this function is invoked when channel is already closed
244 override fun cleanupSendQueueOnCancel() {
245 // clear buffer first
246 lock.withLock {
247 repeat(size) {
248 buffer[head] = 0
Vsevolod Tolstopyatovc7e9b562019-08-09 17:33:55 +0300249 head = (head + 1) % buffer.size
Roman Elizarovb555d912017-08-17 21:01:33 +0300250 }
251 size = 0
252 }
253 // then clean all queued senders
254 super.cleanupSendQueueOnCancel()
255 }
Roman Elizarove4b6f092018-03-06 13:37:42 +0300256
257 // ------ debug ------
258
259 override val bufferDebugString: String
Vsevolod Tolstopyatovc7e9b562019-08-09 17:33:55 +0300260 get() = "(buffer:capacity=$capacity,size=$size)"
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +0300261}