blob: 7db4d03c2137c148e50ba09ca117563cf9753cb7 [file] [log] [blame]
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.experimental.io.internal
internal class RingBufferCapacity(private val totalCapacity: Int) {
@Volatile @JvmField
var availableForRead = 0
@Volatile @JvmField
var availableForWrite = totalCapacity
@Volatile @JvmField
var pendingToFlush = 0
// concurrent unsafe!
fun resetForWrite() {
availableForRead = 0
availableForWrite = totalCapacity
pendingToFlush = 0
}
fun resetForRead() {
availableForRead = totalCapacity
availableForWrite = 0
pendingToFlush = 0
}
fun tryReadExact(n: Int): Boolean {
val AvailableForRead = AvailableForRead
while (true) {
val remaining = availableForRead
if (remaining < n) return false
if (AvailableForRead.compareAndSet(this, remaining, remaining - n)) return true
}
}
fun tryReadAtMost(n: Int): Int {
val AvailableForRead = AvailableForRead
while (true) {
val remaining = availableForRead
val delta = minOf(n, remaining)
if (delta == 0) return 0
if (AvailableForRead.compareAndSet(this, remaining, remaining - delta)) return delta
}
}
fun tryWriteAtLeast(n: Int): Int {
val AvailableForWrite = AvailableForWrite
while (true) {
val remaining = availableForWrite
if (remaining < n) return 0
if (AvailableForWrite.compareAndSet(this, remaining, 0)) return remaining
}
}
fun tryWriteExact(n: Int): Boolean {
val AvailableForWrite = AvailableForWrite
while (true) {
val remaining = availableForWrite
if (remaining < n) return false
if (AvailableForWrite.compareAndSet(this, remaining, remaining - n)) return true
}
}
fun tryWriteAtMost(n: Int): Int {
val AvailableForWrite = AvailableForWrite
while (true) {
val remaining = availableForWrite
val delta = minOf(n, remaining)
if (delta == 0) return 0
if (AvailableForWrite.compareAndSet(this, remaining, remaining - delta)) return delta
}
}
fun completeRead(n: Int) {
val totalCapacity = totalCapacity
val AvailableForWrite = AvailableForWrite
while (true) {
val remaining = availableForWrite
val update = remaining + n
if (update > totalCapacity) completeReadOverflow(remaining, update, n)
if (AvailableForWrite.compareAndSet(this, remaining, update)) break
}
}
private fun completeReadOverflow(remaining: Int, update: Int, n: Int): Nothing {
throw IllegalArgumentException("Completed read overflow: $remaining + $n = $update > $totalCapacity")
}
fun completeWrite(n: Int) {
val totalCapacity = totalCapacity
val PendingToFlush = PendingToFlush
while (true) {
val pending = pendingToFlush
val update = pending + n
if (update > totalCapacity) completeReadOverflow(pending, n)
if (PendingToFlush.compareAndSet(this, pending, update)) break
}
}
private fun completeReadOverflow(pending: Int, n: Int): Nothing {
throw IllegalArgumentException("Complete write overflow: $pending + $n > $totalCapacity")
}
/**
* @return true if there are bytes available for read after flush
*/
fun flush(): Boolean {
val AvailableForRead = AvailableForRead
val pending = PendingToFlush.getAndSet(this, 0)
while (true) {
val remaining = availableForRead
val update = remaining + pending
if (remaining == update || AvailableForRead.compareAndSet(this, remaining, update)) {
return update > 0
}
}
}
fun tryLockForRelease(): Boolean {
val AvailableForWrite = AvailableForWrite
while (true) {
val remaining = availableForWrite
if (pendingToFlush > 0 || availableForRead > 0 || remaining != totalCapacity) return false
if (AvailableForWrite.compareAndSet(this, remaining, 0)) return true
}
}
/**
* Make all writers to fail to write any more bytes
* Use only during failure termination
*/
fun forceLockForRelease() {
AvailableForWrite.getAndSet(this, 0)
}
fun isEmpty(): Boolean = availableForWrite == totalCapacity
fun isFull(): Boolean = availableForWrite == 0
companion object {
// todo: replace with atomicfu, remove companion object
private val AvailableForRead = intUpdater(RingBufferCapacity::availableForRead)
private val AvailableForWrite = intUpdater(RingBufferCapacity::availableForWrite)
private val PendingToFlush = intUpdater(RingBufferCapacity::pendingToFlush)
}
}