Roman Elizarov | f16fd27 | 2017-02-07 11:26:00 +0300 | [diff] [blame] | 1 | /* |
Roman Elizarov | 583d39d | 2019-07-02 16:21:22 +0300 | [diff] [blame] | 2 | * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
Roman Elizarov | f16fd27 | 2017-02-07 11:26:00 +0300 | [diff] [blame] | 3 | */ |
| 4 | |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 5 | package kotlinx.coroutines.channels |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 6 | |
Roman Elizarov | 583d39d | 2019-07-02 16:21:22 +0300 | [diff] [blame] | 7 | import kotlinx.coroutines.* |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 8 | import kotlinx.coroutines.internal.* |
| 9 | import kotlinx.coroutines.selects.* |
Vsevolod Tolstopyatov | 36c3ba1 | 2018-10-14 23:45:28 +0300 | [diff] [blame] | 10 | import kotlin.jvm.* |
Vsevolod Tolstopyatov | c7e9b56 | 2019-08-09 17:33:55 +0300 | [diff] [blame^] | 11 | import kotlin.math.* |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 12 | |
| 13 | /** |
| 14 | * Channel with array buffer of a fixed [capacity]. |
Vsevolod Tolstopyatov | 37f1b97 | 2019-01-16 18:09:36 +0300 | [diff] [blame] | 15 | * Sender suspends only when buffer is full and receiver suspends only when buffer is empty. |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 16 | * |
Roman Elizarov | f2a710a | 2017-07-21 18:33:59 +0300 | [diff] [blame] | 17 | * This channel is created by `Channel(capacity)` factory function invocation. |
| 18 | * |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 19 | * This implementation uses lock to protect the buffer, which is held only during very short buffer-update operations. |
| 20 | * The lists of suspended senders or receivers are lock-free. |
Vsevolod Tolstopyatov | 1f7b2d8 | 2018-10-09 15:57:51 +0300 | [diff] [blame] | 21 | **/ |
| 22 | internal open class ArrayChannel<E>( |
Roman Elizarov | f138bbc | 2017-02-09 19:13:08 +0300 | [diff] [blame] | 23 | /** |
| 24 | * Buffer capacity. |
| 25 | */ |
| 26 | val capacity: Int |
| 27 | ) : AbstractChannel<E>() { |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 28 | init { |
Roman Elizarov | 8385ec9 | 2017-05-11 18:32:52 +0300 | [diff] [blame] | 29 | require(capacity >= 1) { "ArrayChannel capacity must be at least 1, but $capacity was specified" } |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 30 | } |
| 31 | |
| 32 | private val lock = ReentrantLock() |
Vsevolod Tolstopyatov | c7e9b56 | 2019-08-09 17:33:55 +0300 | [diff] [blame^] | 33 | /* |
| 34 | * Guarded by lock. |
| 35 | * Allocate minimum of capacity and 16 to avoid excess memory pressure for large channels when it's not necessary. |
| 36 | */ |
| 37 | private var buffer: Array<Any?> = arrayOfNulls<Any?>(min(capacity, 8)) |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 38 | private var head: Int = 0 |
| 39 | @Volatile |
Vsevolod Tolstopyatov | c7e9b56 | 2019-08-09 17:33:55 +0300 | [diff] [blame^] | 40 | private var size: Int = 0 // Invariant: size <= capacity |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 41 | |
Roman Elizarov | 2ad0e94 | 2017-02-28 19:14:08 +0300 | [diff] [blame] | 42 | protected final override val isBufferAlwaysEmpty: Boolean get() = false |
| 43 | protected final override val isBufferEmpty: Boolean get() = size == 0 |
| 44 | protected final override val isBufferAlwaysFull: Boolean get() = false |
| 45 | protected final override val isBufferFull: Boolean get() = size == capacity |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 46 | |
Roman Elizarov | f6fed2a | 2017-02-03 19:12:09 +0300 | [diff] [blame] | 47 | // result is `OFFER_SUCCESS | OFFER_FAILED | Closed` |
Roman Elizarov | 2ad0e94 | 2017-02-28 19:14:08 +0300 | [diff] [blame] | 48 | protected override fun offerInternal(element: E): Any { |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 49 | var receive: ReceiveOrClosed<E>? = null |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 50 | var token: Any? = null |
Roman Elizarov | e3aa8ff | 2017-04-27 19:16:40 +0300 | [diff] [blame] | 51 | lock.withLock { |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 52 | val size = this.size |
Roman Elizarov | f6fed2a | 2017-02-03 19:12:09 +0300 | [diff] [blame] | 53 | closedForSend?.let { return it } |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 54 | if (size < capacity) { |
| 55 | // tentatively put element to buffer |
| 56 | this.size = size + 1 // update size before checking queue (!!!) |
| 57 | // check for receivers that were waiting on empty queue |
| 58 | if (size == 0) { |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 59 | loop@ while (true) { |
| 60 | receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued |
| 61 | if (receive is Closed) { |
| 62 | this.size = size // restore size |
| 63 | return receive!! |
| 64 | } |
| 65 | token = receive!!.tryResumeReceive(element, idempotent = null) |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 66 | if (token != null) { |
| 67 | this.size = size // restore size |
Roman Elizarov | e3aa8ff | 2017-04-27 19:16:40 +0300 | [diff] [blame] | 68 | return@withLock |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 69 | } |
| 70 | } |
| 71 | } |
Vsevolod Tolstopyatov | c7e9b56 | 2019-08-09 17:33:55 +0300 | [diff] [blame^] | 72 | ensureCapacity(size) |
| 73 | buffer[(head + size) % buffer.size] = element // actually queue element |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 74 | return OFFER_SUCCESS |
| 75 | } |
| 76 | // size == capacity: full |
| 77 | return OFFER_FAILED |
| 78 | } |
| 79 | // breaks here if offer meets receiver |
| 80 | receive!!.completeResumeReceive(token!!) |
| 81 | return receive!!.offerResult |
| 82 | } |
| 83 | |
Roman Elizarov | e3aa8ff | 2017-04-27 19:16:40 +0300 | [diff] [blame] | 84 | // result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed` |
Roman Elizarov | 2ad0e94 | 2017-02-28 19:14:08 +0300 | [diff] [blame] | 85 | protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any { |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 86 | var receive: ReceiveOrClosed<E>? = null |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 87 | var token: Any? = null |
Roman Elizarov | e3aa8ff | 2017-04-27 19:16:40 +0300 | [diff] [blame] | 88 | lock.withLock { |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 89 | val size = this.size |
| 90 | closedForSend?.let { return it } |
| 91 | if (size < capacity) { |
| 92 | // tentatively put element to buffer |
| 93 | this.size = size + 1 // update size before checking queue (!!!) |
| 94 | // check for receivers that were waiting on empty queue |
| 95 | if (size == 0) { |
| 96 | loop@ while (true) { |
| 97 | val offerOp = describeTryOffer(element) |
| 98 | val failure = select.performAtomicTrySelect(offerOp) |
| 99 | when { |
| 100 | failure == null -> { // offered successfully |
| 101 | this.size = size // restore size |
| 102 | receive = offerOp.result |
| 103 | token = offerOp.resumeToken |
Roman Elizarov | 583d39d | 2019-07-02 16:21:22 +0300 | [diff] [blame] | 104 | assert { token != null } |
Roman Elizarov | e3aa8ff | 2017-04-27 19:16:40 +0300 | [diff] [blame] | 105 | return@withLock |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 106 | } |
| 107 | failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer |
| 108 | failure === ALREADY_SELECTED || failure is Closed<*> -> { |
| 109 | this.size = size // restore size |
| 110 | return failure |
| 111 | } |
| 112 | else -> error("performAtomicTrySelect(describeTryOffer) returned $failure") |
| 113 | } |
| 114 | } |
| 115 | } |
| 116 | // let's try to select sending this element to buffer |
Roman Elizarov | e3aa8ff | 2017-04-27 19:16:40 +0300 | [diff] [blame] | 117 | if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 118 | this.size = size // restore size |
| 119 | return ALREADY_SELECTED |
| 120 | } |
Vsevolod Tolstopyatov | c7e9b56 | 2019-08-09 17:33:55 +0300 | [diff] [blame^] | 121 | ensureCapacity(size) |
| 122 | buffer[(head + size) % buffer.size] = element // actually queue element |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 123 | return OFFER_SUCCESS |
| 124 | } |
| 125 | // size == capacity: full |
| 126 | return OFFER_FAILED |
| 127 | } |
| 128 | // breaks here if offer meets receiver |
| 129 | receive!!.completeResumeReceive(token!!) |
| 130 | return receive!!.offerResult |
| 131 | } |
| 132 | |
Vsevolod Tolstopyatov | c7e9b56 | 2019-08-09 17:33:55 +0300 | [diff] [blame^] | 133 | // Guarded by lock |
| 134 | private fun ensureCapacity(currentSize: Int) { |
| 135 | if (currentSize >= buffer.size) { |
| 136 | val newSize = min(buffer.size * 2, capacity) |
| 137 | val newBuffer = arrayOfNulls<Any?>(newSize) |
| 138 | for (i in 0 until currentSize) { |
| 139 | newBuffer[i] = buffer[(head + i) % buffer.size] |
| 140 | } |
| 141 | buffer = newBuffer |
| 142 | head = 0 |
| 143 | } |
| 144 | } |
| 145 | |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 146 | // result is `E | POLL_FAILED | Closed` |
Roman Elizarov | 2ad0e94 | 2017-02-28 19:14:08 +0300 | [diff] [blame] | 147 | protected override fun pollInternal(): Any? { |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 148 | var send: Send? = null |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 149 | var token: Any? = null |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 150 | var result: Any? = null |
Roman Elizarov | e3aa8ff | 2017-04-27 19:16:40 +0300 | [diff] [blame] | 151 | lock.withLock { |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 152 | val size = this.size |
Roman Elizarov | e3aa8ff | 2017-04-27 19:16:40 +0300 | [diff] [blame] | 153 | if (size == 0) return closedForSend ?: POLL_FAILED // when nothing can be read from buffer |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 154 | // size > 0: not empty -- retrieve element |
| 155 | result = buffer[head] |
| 156 | buffer[head] = null |
| 157 | this.size = size - 1 // update size before checking queue (!!!) |
| 158 | // check for senders that were waiting on full queue |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 159 | var replacement: Any? = POLL_FAILED |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 160 | if (size == capacity) { |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 161 | loop@ while (true) { |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 162 | send = takeFirstSendOrPeekClosed() ?: break |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 163 | token = send!!.tryResumeSend(idempotent = null) |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 164 | if (token != null) { |
| 165 | replacement = send!!.pollResult |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 166 | break@loop |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 167 | } |
| 168 | } |
| 169 | } |
Roman Elizarov | e3aa8ff | 2017-04-27 19:16:40 +0300 | [diff] [blame] | 170 | if (replacement !== POLL_FAILED && replacement !is Closed<*>) { |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 171 | this.size = size // restore size |
Vsevolod Tolstopyatov | c7e9b56 | 2019-08-09 17:33:55 +0300 | [diff] [blame^] | 172 | buffer[(head + size) % buffer.size] = replacement |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 173 | } |
Vsevolod Tolstopyatov | c7e9b56 | 2019-08-09 17:33:55 +0300 | [diff] [blame^] | 174 | head = (head + 1) % buffer.size |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 175 | } |
| 176 | // complete send the we're taken replacement from |
| 177 | if (token != null) |
| 178 | send!!.completeResumeSend(token!!) |
| 179 | return result |
| 180 | } |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 181 | |
| 182 | // result is `ALREADY_SELECTED | E | POLL_FAILED | Closed` |
Roman Elizarov | 2ad0e94 | 2017-02-28 19:14:08 +0300 | [diff] [blame] | 183 | protected override fun pollSelectInternal(select: SelectInstance<*>): Any? { |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 184 | var send: Send? = null |
| 185 | var token: Any? = null |
| 186 | var result: Any? = null |
Roman Elizarov | e3aa8ff | 2017-04-27 19:16:40 +0300 | [diff] [blame] | 187 | lock.withLock { |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 188 | val size = this.size |
| 189 | if (size == 0) return closedForSend ?: POLL_FAILED |
| 190 | // size > 0: not empty -- retrieve element |
| 191 | result = buffer[head] |
| 192 | buffer[head] = null |
| 193 | this.size = size - 1 // update size before checking queue (!!!) |
| 194 | // check for senders that were waiting on full queue |
| 195 | var replacement: Any? = POLL_FAILED |
| 196 | if (size == capacity) { |
| 197 | loop@ while (true) { |
| 198 | val pollOp = describeTryPoll() |
| 199 | val failure = select.performAtomicTrySelect(pollOp) |
| 200 | when { |
| 201 | failure == null -> { // polled successfully |
| 202 | send = pollOp.result |
| 203 | token = pollOp.resumeToken |
Roman Elizarov | 583d39d | 2019-07-02 16:21:22 +0300 | [diff] [blame] | 204 | assert { token != null } |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 205 | replacement = send!!.pollResult |
| 206 | break@loop |
| 207 | } |
| 208 | failure === POLL_FAILED -> break@loop // cannot poll -> Ok to take from buffer |
| 209 | failure === ALREADY_SELECTED -> { |
| 210 | this.size = size // restore size |
| 211 | buffer[head] = result // restore head |
| 212 | return failure |
| 213 | } |
| 214 | failure is Closed<*> -> { |
| 215 | send = failure |
| 216 | token = failure.tryResumeSend(idempotent = null) |
| 217 | replacement = failure |
| 218 | break@loop |
| 219 | } |
| 220 | else -> error("performAtomicTrySelect(describeTryOffer) returned $failure") |
| 221 | } |
| 222 | } |
| 223 | } |
Roman Elizarov | e3aa8ff | 2017-04-27 19:16:40 +0300 | [diff] [blame] | 224 | if (replacement !== POLL_FAILED && replacement !is Closed<*>) { |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 225 | this.size = size // restore size |
Vsevolod Tolstopyatov | c7e9b56 | 2019-08-09 17:33:55 +0300 | [diff] [blame^] | 226 | buffer[(head + size) % buffer.size] = replacement |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 227 | } else { |
| 228 | // failed to poll or is already closed --> let's try to select receiving this element from buffer |
Roman Elizarov | e3aa8ff | 2017-04-27 19:16:40 +0300 | [diff] [blame] | 229 | if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 230 | this.size = size // restore size |
| 231 | buffer[head] = result // restore head |
| 232 | return ALREADY_SELECTED |
| 233 | } |
| 234 | } |
Vsevolod Tolstopyatov | c7e9b56 | 2019-08-09 17:33:55 +0300 | [diff] [blame^] | 235 | head = (head + 1) % buffer.size |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 236 | } |
| 237 | // complete send the we're taken replacement from |
| 238 | if (token != null) |
| 239 | send!!.completeResumeSend(token!!) |
| 240 | return result |
| 241 | } |
Roman Elizarov | b555d91 | 2017-08-17 21:01:33 +0300 | [diff] [blame] | 242 | |
| 243 | // Note: this function is invoked when channel is already closed |
| 244 | override fun cleanupSendQueueOnCancel() { |
| 245 | // clear buffer first |
| 246 | lock.withLock { |
| 247 | repeat(size) { |
| 248 | buffer[head] = 0 |
Vsevolod Tolstopyatov | c7e9b56 | 2019-08-09 17:33:55 +0300 | [diff] [blame^] | 249 | head = (head + 1) % buffer.size |
Roman Elizarov | b555d91 | 2017-08-17 21:01:33 +0300 | [diff] [blame] | 250 | } |
| 251 | size = 0 |
| 252 | } |
| 253 | // then clean all queued senders |
| 254 | super.cleanupSendQueueOnCancel() |
| 255 | } |
Roman Elizarov | e4b6f09 | 2018-03-06 13:37:42 +0300 | [diff] [blame] | 256 | |
| 257 | // ------ debug ------ |
| 258 | |
| 259 | override val bufferDebugString: String |
Vsevolod Tolstopyatov | c7e9b56 | 2019-08-09 17:33:55 +0300 | [diff] [blame^] | 260 | get() = "(buffer:capacity=$capacity,size=$size)" |
Vsevolod Tolstopyatov | 9619134 | 2018-04-20 18:13:33 +0300 | [diff] [blame] | 261 | } |