blob: 803c84beafeb423141efa842bf6a155b1019a611 [file] [log] [blame]
/*
* Copyright 2016-2017 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kotlinx.coroutines.experimental.channels
import java.util.concurrent.locks.ReentrantLock
/**
* Channel with array buffer of a fixed [capacity].
* Sender suspends only when buffer is fully and receiver suspends only when buffer is empty.
*
* This implementation uses lock to protect the buffer, which is held only during very short buffer-update operations.
* The lists of suspended senders or receivers are lock-free.
*/
public class ArrayChannel<E>(val capacity: Int) : AbstractChannel<E>() {
init {
check(capacity >= 1) { "ArrayChannel capacity must be at least 1, but $capacity was specified" }
}
private val lock = ReentrantLock()
private val buffer: Array<Any?> = arrayOfNulls<Any?>(capacity)
private var head: Int = 0
@Volatile
private var size: Int = 0
private inline fun <T> locked(block: () -> T): T {
lock.lock()
return try { block() }
finally { lock.unlock() }
}
override val hasBuffer: Boolean get() = true
override val isBufferEmpty: Boolean get() = size == 0
override val isBufferFull: Boolean get() = size == capacity
// result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
override fun offerInternal(element: E): Any {
var token: Any? = null
var receive: ReceiveOrClosed<E>? = null
locked {
val size = this.size
closedForSend?.let { return it }
if (size < capacity) {
// tentatively put element to buffer
this.size = size + 1 // update size before checking queue (!!!)
// check for receivers that were waiting on empty queue
if (size == 0) {
while (true) {
receive = takeFirstReceiveOrPeekClosed() ?: break // break when no receivers queued
token = receive!!.tryResumeReceive(element)
if (token != null) {
this.size = size // restore size
return@locked
}
}
}
buffer[(head + size) % capacity] = element // actually queue element
return OFFER_SUCCESS
}
// size == capacity: full
return OFFER_FAILED
}
// breaks here if offer meets receiver
receive!!.completeResumeReceive(token!!)
return receive!!.offerResult
}
// result is `E | POLL_EMPTY | Closed`
override fun pollInternal(): Any? {
var token: Any? = null
var send: Send? = null
var result: Any? = null
locked {
val size = this.size
if (size == 0) return closedForSend ?: POLL_EMPTY
// size > 0: not empty -- retrieve element
result = buffer[head]
buffer[head] = null
this.size = size - 1 // update size before checking queue (!!!)
// check for senders that were waiting on full queue
var replacement: Any? = POLL_EMPTY
if (size == capacity) {
while (true) {
send = takeFirstSendOrPeekClosed() ?: break
token = send!!.tryResumeSend()
if (token != null) {
replacement = send!!.pollResult
break
}
}
}
if (replacement !== POLL_EMPTY && !isClosed(replacement)) {
this.size = size // restore size
buffer[(head + size) % capacity] = replacement
}
head = (head + 1) % capacity
}
// complete send the we're taken replacement from
if (token != null)
send!!.completeResumeSend(token!!)
return result
}
}