blob: 688125d946f3033458a95d3a18808e23218c518a [file] [log] [blame]
Roman Elizarovf16fd272017-02-07 11:26:00 +03001/*
Roman Elizarov583d39d2019-07-02 16:21:22 +03002 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarovf16fd272017-02-07 11:26:00 +03003 */
4
Roman Elizarov0950dfa2018-07-13 10:33:25 +03005package kotlinx.coroutines.channels
Roman Elizarov7b2d8b02017-02-02 20:09:14 +03006
Roman Elizarov583d39d2019-07-02 16:21:22 +03007import kotlinx.coroutines.*
Roman Elizarov0950dfa2018-07-13 10:33:25 +03008import kotlinx.coroutines.internal.*
9import kotlinx.coroutines.selects.*
Vsevolod Tolstopyatov36c3ba12018-10-14 23:45:28 +030010import kotlin.jvm.*
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030011
12/**
13 * Channel with array buffer of a fixed [capacity].
Vsevolod Tolstopyatov37f1b972019-01-16 18:09:36 +030014 * Sender suspends only when buffer is full and receiver suspends only when buffer is empty.
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030015 *
Roman Elizarovf2a710a2017-07-21 18:33:59 +030016 * This channel is created by `Channel(capacity)` factory function invocation.
17 *
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030018 * This implementation uses lock to protect the buffer, which is held only during very short buffer-update operations.
19 * The lists of suspended senders or receivers are lock-free.
Vsevolod Tolstopyatov1f7b2d82018-10-09 15:57:51 +030020 **/
21internal open class ArrayChannel<E>(
Roman Elizarovf138bbc2017-02-09 19:13:08 +030022 /**
23 * Buffer capacity.
24 */
25 val capacity: Int
26) : AbstractChannel<E>() {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030027 init {
Roman Elizarov8385ec92017-05-11 18:32:52 +030028 require(capacity >= 1) { "ArrayChannel capacity must be at least 1, but $capacity was specified" }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030029 }
30
31 private val lock = ReentrantLock()
32 private val buffer: Array<Any?> = arrayOfNulls<Any?>(capacity)
33 private var head: Int = 0
34 @Volatile
35 private var size: Int = 0
36
Roman Elizarov2ad0e942017-02-28 19:14:08 +030037 protected final override val isBufferAlwaysEmpty: Boolean get() = false
38 protected final override val isBufferEmpty: Boolean get() = size == 0
39 protected final override val isBufferAlwaysFull: Boolean get() = false
40 protected final override val isBufferFull: Boolean get() = size == capacity
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030041
Roman Elizarovf6fed2a2017-02-03 19:12:09 +030042 // result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
Roman Elizarov2ad0e942017-02-28 19:14:08 +030043 protected override fun offerInternal(element: E): Any {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030044 var receive: ReceiveOrClosed<E>? = null
Roman Elizarov1216e912017-02-22 09:57:06 +030045 var token: Any? = null
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030046 lock.withLock {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030047 val size = this.size
Roman Elizarovf6fed2a2017-02-03 19:12:09 +030048 closedForSend?.let { return it }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030049 if (size < capacity) {
50 // tentatively put element to buffer
51 this.size = size + 1 // update size before checking queue (!!!)
52 // check for receivers that were waiting on empty queue
53 if (size == 0) {
Roman Elizarov1216e912017-02-22 09:57:06 +030054 loop@ while (true) {
55 receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
56 if (receive is Closed) {
57 this.size = size // restore size
58 return receive!!
59 }
60 token = receive!!.tryResumeReceive(element, idempotent = null)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030061 if (token != null) {
62 this.size = size // restore size
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030063 return@withLock
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030064 }
65 }
66 }
67 buffer[(head + size) % capacity] = element // actually queue element
68 return OFFER_SUCCESS
69 }
70 // size == capacity: full
71 return OFFER_FAILED
72 }
73 // breaks here if offer meets receiver
74 receive!!.completeResumeReceive(token!!)
75 return receive!!.offerResult
76 }
77
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030078 // result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`
Roman Elizarov2ad0e942017-02-28 19:14:08 +030079 protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
Roman Elizarov1216e912017-02-22 09:57:06 +030080 var receive: ReceiveOrClosed<E>? = null
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030081 var token: Any? = null
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030082 lock.withLock {
Roman Elizarov1216e912017-02-22 09:57:06 +030083 val size = this.size
84 closedForSend?.let { return it }
85 if (size < capacity) {
86 // tentatively put element to buffer
87 this.size = size + 1 // update size before checking queue (!!!)
88 // check for receivers that were waiting on empty queue
89 if (size == 0) {
90 loop@ while (true) {
91 val offerOp = describeTryOffer(element)
92 val failure = select.performAtomicTrySelect(offerOp)
93 when {
94 failure == null -> { // offered successfully
95 this.size = size // restore size
96 receive = offerOp.result
97 token = offerOp.resumeToken
Roman Elizarov583d39d2019-07-02 16:21:22 +030098 assert { token != null }
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030099 return@withLock
Roman Elizarov1216e912017-02-22 09:57:06 +0300100 }
101 failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
102 failure === ALREADY_SELECTED || failure is Closed<*> -> {
103 this.size = size // restore size
104 return failure
105 }
106 else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
107 }
108 }
109 }
110 // let's try to select sending this element to buffer
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300111 if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
Roman Elizarov1216e912017-02-22 09:57:06 +0300112 this.size = size // restore size
113 return ALREADY_SELECTED
114 }
115 buffer[(head + size) % capacity] = element // actually queue element
116 return OFFER_SUCCESS
117 }
118 // size == capacity: full
119 return OFFER_FAILED
120 }
121 // breaks here if offer meets receiver
122 receive!!.completeResumeReceive(token!!)
123 return receive!!.offerResult
124 }
125
126 // result is `E | POLL_FAILED | Closed`
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300127 protected override fun pollInternal(): Any? {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300128 var send: Send? = null
Roman Elizarov1216e912017-02-22 09:57:06 +0300129 var token: Any? = null
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300130 var result: Any? = null
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300131 lock.withLock {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300132 val size = this.size
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300133 if (size == 0) return closedForSend ?: POLL_FAILED // when nothing can be read from buffer
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300134 // size > 0: not empty -- retrieve element
135 result = buffer[head]
136 buffer[head] = null
137 this.size = size - 1 // update size before checking queue (!!!)
138 // check for senders that were waiting on full queue
Roman Elizarov1216e912017-02-22 09:57:06 +0300139 var replacement: Any? = POLL_FAILED
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300140 if (size == capacity) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300141 loop@ while (true) {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300142 send = takeFirstSendOrPeekClosed() ?: break
Roman Elizarov1216e912017-02-22 09:57:06 +0300143 token = send!!.tryResumeSend(idempotent = null)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300144 if (token != null) {
145 replacement = send!!.pollResult
Roman Elizarov1216e912017-02-22 09:57:06 +0300146 break@loop
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300147 }
148 }
149 }
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300150 if (replacement !== POLL_FAILED && replacement !is Closed<*>) {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300151 this.size = size // restore size
152 buffer[(head + size) % capacity] = replacement
153 }
154 head = (head + 1) % capacity
155 }
156 // complete send the we're taken replacement from
157 if (token != null)
158 send!!.completeResumeSend(token!!)
159 return result
160 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300161
162 // result is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300163 protected override fun pollSelectInternal(select: SelectInstance<*>): Any? {
Roman Elizarov1216e912017-02-22 09:57:06 +0300164 var send: Send? = null
165 var token: Any? = null
166 var result: Any? = null
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300167 lock.withLock {
Roman Elizarov1216e912017-02-22 09:57:06 +0300168 val size = this.size
169 if (size == 0) return closedForSend ?: POLL_FAILED
170 // size > 0: not empty -- retrieve element
171 result = buffer[head]
172 buffer[head] = null
173 this.size = size - 1 // update size before checking queue (!!!)
174 // check for senders that were waiting on full queue
175 var replacement: Any? = POLL_FAILED
176 if (size == capacity) {
177 loop@ while (true) {
178 val pollOp = describeTryPoll()
179 val failure = select.performAtomicTrySelect(pollOp)
180 when {
181 failure == null -> { // polled successfully
182 send = pollOp.result
183 token = pollOp.resumeToken
Roman Elizarov583d39d2019-07-02 16:21:22 +0300184 assert { token != null }
Roman Elizarov1216e912017-02-22 09:57:06 +0300185 replacement = send!!.pollResult
186 break@loop
187 }
188 failure === POLL_FAILED -> break@loop // cannot poll -> Ok to take from buffer
189 failure === ALREADY_SELECTED -> {
190 this.size = size // restore size
191 buffer[head] = result // restore head
192 return failure
193 }
194 failure is Closed<*> -> {
195 send = failure
196 token = failure.tryResumeSend(idempotent = null)
197 replacement = failure
198 break@loop
199 }
200 else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
201 }
202 }
203 }
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300204 if (replacement !== POLL_FAILED && replacement !is Closed<*>) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300205 this.size = size // restore size
206 buffer[(head + size) % capacity] = replacement
207 } else {
208 // failed to poll or is already closed --> let's try to select receiving this element from buffer
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300209 if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock
Roman Elizarov1216e912017-02-22 09:57:06 +0300210 this.size = size // restore size
211 buffer[head] = result // restore head
212 return ALREADY_SELECTED
213 }
214 }
215 head = (head + 1) % capacity
216 }
217 // complete send the we're taken replacement from
218 if (token != null)
219 send!!.completeResumeSend(token!!)
220 return result
221 }
Roman Elizarovb555d912017-08-17 21:01:33 +0300222
223 // Note: this function is invoked when channel is already closed
224 override fun cleanupSendQueueOnCancel() {
225 // clear buffer first
226 lock.withLock {
227 repeat(size) {
228 buffer[head] = 0
229 head = (head + 1) % capacity
230 }
231 size = 0
232 }
233 // then clean all queued senders
234 super.cleanupSendQueueOnCancel()
235 }
Roman Elizarove4b6f092018-03-06 13:37:42 +0300236
237 // ------ debug ------
238
239 override val bufferDebugString: String
240 get() = "(buffer:capacity=${buffer.size},size=$size)"
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +0300241}