blob: c75f1bb553dc382452b061e31dbd2cc6eba01a2f [file] [log] [blame]
Roman Elizarovf16fd272017-02-07 11:26:00 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030017package kotlinx.coroutines.experimental.channels
18
Roman Elizarov1216e912017-02-22 09:57:06 +030019import kotlinx.coroutines.experimental.ALREADY_SELECTED
20import kotlinx.coroutines.experimental.selects.SelectInstance
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030021import java.util.concurrent.locks.ReentrantLock
22
23/**
24 * Channel with array buffer of a fixed [capacity].
25 * Sender suspends only when buffer is fully and receiver suspends only when buffer is empty.
26 *
27 * This implementation uses lock to protect the buffer, which is held only during very short buffer-update operations.
28 * The lists of suspended senders or receivers are lock-free.
29 */
Roman Elizarovf138bbc2017-02-09 19:13:08 +030030public class ArrayChannel<E>(
31 /**
32 * Buffer capacity.
33 */
34 val capacity: Int
35) : AbstractChannel<E>() {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030036 init {
37 check(capacity >= 1) { "ArrayChannel capacity must be at least 1, but $capacity was specified" }
38 }
39
40 private val lock = ReentrantLock()
41 private val buffer: Array<Any?> = arrayOfNulls<Any?>(capacity)
42 private var head: Int = 0
43 @Volatile
44 private var size: Int = 0
45
46 private inline fun <T> locked(block: () -> T): T {
47 lock.lock()
48 return try { block() }
49 finally { lock.unlock() }
50 }
51
52 override val hasBuffer: Boolean get() = true
53 override val isBufferEmpty: Boolean get() = size == 0
54 override val isBufferFull: Boolean get() = size == capacity
55
Roman Elizarovf6fed2a2017-02-03 19:12:09 +030056 // result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
57 override fun offerInternal(element: E): Any {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030058 var receive: ReceiveOrClosed<E>? = null
Roman Elizarov1216e912017-02-22 09:57:06 +030059 var token: Any? = null
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030060 locked {
61 val size = this.size
Roman Elizarovf6fed2a2017-02-03 19:12:09 +030062 closedForSend?.let { return it }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030063 if (size < capacity) {
64 // tentatively put element to buffer
65 this.size = size + 1 // update size before checking queue (!!!)
66 // check for receivers that were waiting on empty queue
67 if (size == 0) {
Roman Elizarov1216e912017-02-22 09:57:06 +030068 loop@ while (true) {
69 receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
70 if (receive is Closed) {
71 this.size = size // restore size
72 return receive!!
73 }
74 token = receive!!.tryResumeReceive(element, idempotent = null)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030075 if (token != null) {
76 this.size = size // restore size
77 return@locked
78 }
79 }
80 }
81 buffer[(head + size) % capacity] = element // actually queue element
82 return OFFER_SUCCESS
83 }
84 // size == capacity: full
85 return OFFER_FAILED
86 }
87 // breaks here if offer meets receiver
88 receive!!.completeResumeReceive(token!!)
89 return receive!!.offerResult
90 }
91
Roman Elizarov1216e912017-02-22 09:57:06 +030092 // result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`.
93 override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
94 var receive: ReceiveOrClosed<E>? = null
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030095 var token: Any? = null
Roman Elizarov1216e912017-02-22 09:57:06 +030096 locked {
97 val size = this.size
98 closedForSend?.let { return it }
99 if (size < capacity) {
100 // tentatively put element to buffer
101 this.size = size + 1 // update size before checking queue (!!!)
102 // check for receivers that were waiting on empty queue
103 if (size == 0) {
104 loop@ while (true) {
105 val offerOp = describeTryOffer(element)
106 val failure = select.performAtomicTrySelect(offerOp)
107 when {
108 failure == null -> { // offered successfully
109 this.size = size // restore size
110 receive = offerOp.result
111 token = offerOp.resumeToken
112 check(token != null)
113 return@locked
114 }
115 failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
116 failure === ALREADY_SELECTED || failure is Closed<*> -> {
117 this.size = size // restore size
118 return failure
119 }
120 else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
121 }
122 }
123 }
124 // let's try to select sending this element to buffer
125 if (!select.trySelect(null)) {
126 this.size = size // restore size
127 return ALREADY_SELECTED
128 }
129 buffer[(head + size) % capacity] = element // actually queue element
130 return OFFER_SUCCESS
131 }
132 // size == capacity: full
133 return OFFER_FAILED
134 }
135 // breaks here if offer meets receiver
136 receive!!.completeResumeReceive(token!!)
137 return receive!!.offerResult
138 }
139
140 // result is `E | POLL_FAILED | Closed`
141 override fun pollInternal(): Any? {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300142 var send: Send? = null
Roman Elizarov1216e912017-02-22 09:57:06 +0300143 var token: Any? = null
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300144 var result: Any? = null
145 locked {
146 val size = this.size
Roman Elizarov1216e912017-02-22 09:57:06 +0300147 if (size == 0) return closedForSend ?: POLL_FAILED
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300148 // size > 0: not empty -- retrieve element
149 result = buffer[head]
150 buffer[head] = null
151 this.size = size - 1 // update size before checking queue (!!!)
152 // check for senders that were waiting on full queue
Roman Elizarov1216e912017-02-22 09:57:06 +0300153 var replacement: Any? = POLL_FAILED
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300154 if (size == capacity) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300155 loop@ while (true) {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300156 send = takeFirstSendOrPeekClosed() ?: break
Roman Elizarov1216e912017-02-22 09:57:06 +0300157 token = send!!.tryResumeSend(idempotent = null)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300158 if (token != null) {
159 replacement = send!!.pollResult
Roman Elizarov1216e912017-02-22 09:57:06 +0300160 break@loop
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300161 }
162 }
163 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300164 if (replacement !== POLL_FAILED && !isClosed(replacement)) {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300165 this.size = size // restore size
166 buffer[(head + size) % capacity] = replacement
167 }
168 head = (head + 1) % capacity
169 }
170 // complete send the we're taken replacement from
171 if (token != null)
172 send!!.completeResumeSend(token!!)
173 return result
174 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300175
176 // result is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
177 override fun pollSelectInternal(select: SelectInstance<*>): Any? {
178 var send: Send? = null
179 var token: Any? = null
180 var result: Any? = null
181 locked {
182 val size = this.size
183 if (size == 0) return closedForSend ?: POLL_FAILED
184 // size > 0: not empty -- retrieve element
185 result = buffer[head]
186 buffer[head] = null
187 this.size = size - 1 // update size before checking queue (!!!)
188 // check for senders that were waiting on full queue
189 var replacement: Any? = POLL_FAILED
190 if (size == capacity) {
191 loop@ while (true) {
192 val pollOp = describeTryPoll()
193 val failure = select.performAtomicTrySelect(pollOp)
194 when {
195 failure == null -> { // polled successfully
196 send = pollOp.result
197 token = pollOp.resumeToken
198 check(token != null)
199 replacement = send!!.pollResult
200 break@loop
201 }
202 failure === POLL_FAILED -> break@loop // cannot poll -> Ok to take from buffer
203 failure === ALREADY_SELECTED -> {
204 this.size = size // restore size
205 buffer[head] = result // restore head
206 return failure
207 }
208 failure is Closed<*> -> {
209 send = failure
210 token = failure.tryResumeSend(idempotent = null)
211 replacement = failure
212 break@loop
213 }
214 else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
215 }
216 }
217 }
218 if (replacement !== POLL_FAILED && !isClosed(replacement)) {
219 this.size = size // restore size
220 buffer[(head + size) % capacity] = replacement
221 } else {
222 // failed to poll or is already closed --> let's try to select receiving this element from buffer
223 if (!select.trySelect(null)) {
224 this.size = size // restore size
225 buffer[head] = result // restore head
226 return ALREADY_SELECTED
227 }
228 }
229 head = (head + 1) % capacity
230 }
231 // complete send the we're taken replacement from
232 if (token != null)
233 send!!.completeResumeSend(token!!)
234 return result
235 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300236}