Roman Elizarov | f16fd27 | 2017-02-07 11:26:00 +0300 | [diff] [blame] | 1 | /* |
Roman Elizarov | 1f74a2d | 2018-06-29 19:19:45 +0300 | [diff] [blame^] | 2 | * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
Roman Elizarov | f16fd27 | 2017-02-07 11:26:00 +0300 | [diff] [blame] | 3 | */ |
| 4 | |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 5 | package kotlinx.coroutines.experimental.channels |
| 6 | |
Vsevolod Tolstopyatov | 9619134 | 2018-04-20 18:13:33 +0300 | [diff] [blame] | 7 | import kotlinx.coroutines.experimental.internal.* |
| 8 | import kotlinx.coroutines.experimental.internalAnnotations.Volatile |
Roman Elizarov | e4b6f09 | 2018-03-06 13:37:42 +0300 | [diff] [blame] | 9 | import kotlinx.coroutines.experimental.selects.* |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 10 | |
| 11 | /** |
| 12 | * Channel with array buffer of a fixed [capacity]. |
| 13 | * Sender suspends only when buffer is fully and receiver suspends only when buffer is empty. |
| 14 | * |
Roman Elizarov | f2a710a | 2017-07-21 18:33:59 +0300 | [diff] [blame] | 15 | * This channel is created by `Channel(capacity)` factory function invocation. |
| 16 | * |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 17 | * This implementation uses lock to protect the buffer, which is held only during very short buffer-update operations. |
| 18 | * The lists of suspended senders or receivers are lock-free. |
| 19 | */ |
Roman Elizarov | 2ad0e94 | 2017-02-28 19:14:08 +0300 | [diff] [blame] | 20 | public open class ArrayChannel<E>( |
Roman Elizarov | f138bbc | 2017-02-09 19:13:08 +0300 | [diff] [blame] | 21 | /** |
| 22 | * Buffer capacity. |
| 23 | */ |
| 24 | val capacity: Int |
| 25 | ) : AbstractChannel<E>() { |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 26 | init { |
Roman Elizarov | 8385ec9 | 2017-05-11 18:32:52 +0300 | [diff] [blame] | 27 | require(capacity >= 1) { "ArrayChannel capacity must be at least 1, but $capacity was specified" } |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 28 | } |
| 29 | |
| 30 | private val lock = ReentrantLock() |
| 31 | private val buffer: Array<Any?> = arrayOfNulls<Any?>(capacity) |
| 32 | private var head: Int = 0 |
| 33 | @Volatile |
| 34 | private var size: Int = 0 |
| 35 | |
Roman Elizarov | 2ad0e94 | 2017-02-28 19:14:08 +0300 | [diff] [blame] | 36 | protected final override val isBufferAlwaysEmpty: Boolean get() = false |
| 37 | protected final override val isBufferEmpty: Boolean get() = size == 0 |
| 38 | protected final override val isBufferAlwaysFull: Boolean get() = false |
| 39 | protected final override val isBufferFull: Boolean get() = size == capacity |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 40 | |
Roman Elizarov | f6fed2a | 2017-02-03 19:12:09 +0300 | [diff] [blame] | 41 | // result is `OFFER_SUCCESS | OFFER_FAILED | Closed` |
Roman Elizarov | 2ad0e94 | 2017-02-28 19:14:08 +0300 | [diff] [blame] | 42 | protected override fun offerInternal(element: E): Any { |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 43 | var receive: ReceiveOrClosed<E>? = null |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 44 | var token: Any? = null |
Roman Elizarov | e3aa8ff | 2017-04-27 19:16:40 +0300 | [diff] [blame] | 45 | lock.withLock { |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 46 | val size = this.size |
Roman Elizarov | f6fed2a | 2017-02-03 19:12:09 +0300 | [diff] [blame] | 47 | closedForSend?.let { return it } |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 48 | if (size < capacity) { |
| 49 | // tentatively put element to buffer |
| 50 | this.size = size + 1 // update size before checking queue (!!!) |
| 51 | // check for receivers that were waiting on empty queue |
| 52 | if (size == 0) { |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 53 | loop@ while (true) { |
| 54 | receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued |
| 55 | if (receive is Closed) { |
| 56 | this.size = size // restore size |
| 57 | return receive!! |
| 58 | } |
| 59 | token = receive!!.tryResumeReceive(element, idempotent = null) |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 60 | if (token != null) { |
| 61 | this.size = size // restore size |
Roman Elizarov | e3aa8ff | 2017-04-27 19:16:40 +0300 | [diff] [blame] | 62 | return@withLock |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 63 | } |
| 64 | } |
| 65 | } |
| 66 | buffer[(head + size) % capacity] = element // actually queue element |
| 67 | return OFFER_SUCCESS |
| 68 | } |
| 69 | // size == capacity: full |
| 70 | return OFFER_FAILED |
| 71 | } |
| 72 | // breaks here if offer meets receiver |
| 73 | receive!!.completeResumeReceive(token!!) |
| 74 | return receive!!.offerResult |
| 75 | } |
| 76 | |
Roman Elizarov | e3aa8ff | 2017-04-27 19:16:40 +0300 | [diff] [blame] | 77 | // result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed` |
Roman Elizarov | 2ad0e94 | 2017-02-28 19:14:08 +0300 | [diff] [blame] | 78 | protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any { |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 79 | var receive: ReceiveOrClosed<E>? = null |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 80 | var token: Any? = null |
Roman Elizarov | e3aa8ff | 2017-04-27 19:16:40 +0300 | [diff] [blame] | 81 | lock.withLock { |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 82 | val size = this.size |
| 83 | closedForSend?.let { return it } |
| 84 | if (size < capacity) { |
| 85 | // tentatively put element to buffer |
| 86 | this.size = size + 1 // update size before checking queue (!!!) |
| 87 | // check for receivers that were waiting on empty queue |
| 88 | if (size == 0) { |
| 89 | loop@ while (true) { |
| 90 | val offerOp = describeTryOffer(element) |
| 91 | val failure = select.performAtomicTrySelect(offerOp) |
| 92 | when { |
| 93 | failure == null -> { // offered successfully |
| 94 | this.size = size // restore size |
| 95 | receive = offerOp.result |
| 96 | token = offerOp.resumeToken |
| 97 | check(token != null) |
Roman Elizarov | e3aa8ff | 2017-04-27 19:16:40 +0300 | [diff] [blame] | 98 | return@withLock |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 99 | } |
| 100 | failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer |
| 101 | failure === ALREADY_SELECTED || failure is Closed<*> -> { |
| 102 | this.size = size // restore size |
| 103 | return failure |
| 104 | } |
| 105 | else -> error("performAtomicTrySelect(describeTryOffer) returned $failure") |
| 106 | } |
| 107 | } |
| 108 | } |
| 109 | // let's try to select sending this element to buffer |
Roman Elizarov | e3aa8ff | 2017-04-27 19:16:40 +0300 | [diff] [blame] | 110 | if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 111 | this.size = size // restore size |
| 112 | return ALREADY_SELECTED |
| 113 | } |
| 114 | buffer[(head + size) % capacity] = element // actually queue element |
| 115 | return OFFER_SUCCESS |
| 116 | } |
| 117 | // size == capacity: full |
| 118 | return OFFER_FAILED |
| 119 | } |
| 120 | // breaks here if offer meets receiver |
| 121 | receive!!.completeResumeReceive(token!!) |
| 122 | return receive!!.offerResult |
| 123 | } |
| 124 | |
| 125 | // result is `E | POLL_FAILED | Closed` |
Roman Elizarov | 2ad0e94 | 2017-02-28 19:14:08 +0300 | [diff] [blame] | 126 | protected override fun pollInternal(): Any? { |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 127 | var send: Send? = null |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 128 | var token: Any? = null |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 129 | var result: Any? = null |
Roman Elizarov | e3aa8ff | 2017-04-27 19:16:40 +0300 | [diff] [blame] | 130 | lock.withLock { |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 131 | val size = this.size |
Roman Elizarov | e3aa8ff | 2017-04-27 19:16:40 +0300 | [diff] [blame] | 132 | if (size == 0) return closedForSend ?: POLL_FAILED // when nothing can be read from buffer |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 133 | // size > 0: not empty -- retrieve element |
| 134 | result = buffer[head] |
| 135 | buffer[head] = null |
| 136 | this.size = size - 1 // update size before checking queue (!!!) |
| 137 | // check for senders that were waiting on full queue |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 138 | var replacement: Any? = POLL_FAILED |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 139 | if (size == capacity) { |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 140 | loop@ while (true) { |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 141 | send = takeFirstSendOrPeekClosed() ?: break |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 142 | token = send!!.tryResumeSend(idempotent = null) |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 143 | if (token != null) { |
| 144 | replacement = send!!.pollResult |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 145 | break@loop |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 146 | } |
| 147 | } |
| 148 | } |
Roman Elizarov | e3aa8ff | 2017-04-27 19:16:40 +0300 | [diff] [blame] | 149 | if (replacement !== POLL_FAILED && replacement !is Closed<*>) { |
Roman Elizarov | 7b2d8b0 | 2017-02-02 20:09:14 +0300 | [diff] [blame] | 150 | this.size = size // restore size |
| 151 | buffer[(head + size) % capacity] = replacement |
| 152 | } |
| 153 | head = (head + 1) % capacity |
| 154 | } |
| 155 | // complete send the we're taken replacement from |
| 156 | if (token != null) |
| 157 | send!!.completeResumeSend(token!!) |
| 158 | return result |
| 159 | } |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 160 | |
| 161 | // result is `ALREADY_SELECTED | E | POLL_FAILED | Closed` |
Roman Elizarov | 2ad0e94 | 2017-02-28 19:14:08 +0300 | [diff] [blame] | 162 | protected override fun pollSelectInternal(select: SelectInstance<*>): Any? { |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 163 | var send: Send? = null |
| 164 | var token: Any? = null |
| 165 | var result: Any? = null |
Roman Elizarov | e3aa8ff | 2017-04-27 19:16:40 +0300 | [diff] [blame] | 166 | lock.withLock { |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 167 | val size = this.size |
| 168 | if (size == 0) return closedForSend ?: POLL_FAILED |
| 169 | // size > 0: not empty -- retrieve element |
| 170 | result = buffer[head] |
| 171 | buffer[head] = null |
| 172 | this.size = size - 1 // update size before checking queue (!!!) |
| 173 | // check for senders that were waiting on full queue |
| 174 | var replacement: Any? = POLL_FAILED |
| 175 | if (size == capacity) { |
| 176 | loop@ while (true) { |
| 177 | val pollOp = describeTryPoll() |
| 178 | val failure = select.performAtomicTrySelect(pollOp) |
| 179 | when { |
| 180 | failure == null -> { // polled successfully |
| 181 | send = pollOp.result |
| 182 | token = pollOp.resumeToken |
| 183 | check(token != null) |
| 184 | replacement = send!!.pollResult |
| 185 | break@loop |
| 186 | } |
| 187 | failure === POLL_FAILED -> break@loop // cannot poll -> Ok to take from buffer |
| 188 | failure === ALREADY_SELECTED -> { |
| 189 | this.size = size // restore size |
| 190 | buffer[head] = result // restore head |
| 191 | return failure |
| 192 | } |
| 193 | failure is Closed<*> -> { |
| 194 | send = failure |
| 195 | token = failure.tryResumeSend(idempotent = null) |
| 196 | replacement = failure |
| 197 | break@loop |
| 198 | } |
| 199 | else -> error("performAtomicTrySelect(describeTryOffer) returned $failure") |
| 200 | } |
| 201 | } |
| 202 | } |
Roman Elizarov | e3aa8ff | 2017-04-27 19:16:40 +0300 | [diff] [blame] | 203 | if (replacement !== POLL_FAILED && replacement !is Closed<*>) { |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 204 | this.size = size // restore size |
| 205 | buffer[(head + size) % capacity] = replacement |
| 206 | } else { |
| 207 | // failed to poll or is already closed --> let's try to select receiving this element from buffer |
Roman Elizarov | e3aa8ff | 2017-04-27 19:16:40 +0300 | [diff] [blame] | 208 | if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock |
Roman Elizarov | 1216e91 | 2017-02-22 09:57:06 +0300 | [diff] [blame] | 209 | this.size = size // restore size |
| 210 | buffer[head] = result // restore head |
| 211 | return ALREADY_SELECTED |
| 212 | } |
| 213 | } |
| 214 | head = (head + 1) % capacity |
| 215 | } |
| 216 | // complete send the we're taken replacement from |
| 217 | if (token != null) |
| 218 | send!!.completeResumeSend(token!!) |
| 219 | return result |
| 220 | } |
Roman Elizarov | b555d91 | 2017-08-17 21:01:33 +0300 | [diff] [blame] | 221 | |
| 222 | // Note: this function is invoked when channel is already closed |
| 223 | override fun cleanupSendQueueOnCancel() { |
| 224 | // clear buffer first |
| 225 | lock.withLock { |
| 226 | repeat(size) { |
| 227 | buffer[head] = 0 |
| 228 | head = (head + 1) % capacity |
| 229 | } |
| 230 | size = 0 |
| 231 | } |
| 232 | // then clean all queued senders |
| 233 | super.cleanupSendQueueOnCancel() |
| 234 | } |
Roman Elizarov | e4b6f09 | 2018-03-06 13:37:42 +0300 | [diff] [blame] | 235 | |
| 236 | // ------ debug ------ |
| 237 | |
| 238 | override val bufferDebugString: String |
| 239 | get() = "(buffer:capacity=${buffer.size},size=$size)" |
Vsevolod Tolstopyatov | 9619134 | 2018-04-20 18:13:33 +0300 | [diff] [blame] | 240 | } |