blob: 797d224449f9849d69db64033631b1d9e866be06 [file] [log] [blame]
Sergey Mashkov86f70612017-07-25 10:59:44 +03001package kotlinx.coroutines.experimental.io.internal
2
Roman Elizarov83762032017-08-08 19:58:40 +03003internal class RingBufferCapacity(private val totalCapacity: Int) {
Roman Elizarov469cad32017-08-15 15:54:56 +03004 @Volatile @JvmField
5 var availableForRead = 0
Sergey Mashkov86f70612017-07-25 10:59:44 +03006
Roman Elizarov469cad32017-08-15 15:54:56 +03007 @Volatile @JvmField
8 var availableForWrite = totalCapacity
Sergey Mashkov86f70612017-07-25 10:59:44 +03009
Roman Elizarov469cad32017-08-15 15:54:56 +030010 @Volatile @JvmField
11 var pendingToFlush = 0
Sergey Mashkov86f70612017-07-25 10:59:44 +030012
13 // concurrent unsafe!
Roman Elizarov469cad32017-08-15 15:54:56 +030014 fun resetForWrite() {
15 availableForRead = 0
16 availableForWrite = totalCapacity
17 pendingToFlush = 0
Sergey Mashkov86f70612017-07-25 10:59:44 +030018 }
19
Sergey Mashkov569b5422017-07-25 12:17:11 +030020 fun resetForRead() {
Roman Elizarov469cad32017-08-15 15:54:56 +030021 availableForRead = totalCapacity
22 availableForWrite = 0
23 pendingToFlush = 0
Sergey Mashkov569b5422017-07-25 12:17:11 +030024 }
25
Sergey Mashkov86f70612017-07-25 10:59:44 +030026 fun tryReadExact(n: Int): Boolean {
Sergey Mashkovffc71342017-12-18 19:31:08 +030027 val AvailableForRead = AvailableForRead
Sergey Mashkov86f70612017-07-25 10:59:44 +030028 while (true) {
Roman Elizarov469cad32017-08-15 15:54:56 +030029 val remaining = availableForRead
Sergey Mashkov86f70612017-07-25 10:59:44 +030030 if (remaining < n) return false
Roman Elizarov469cad32017-08-15 15:54:56 +030031 if (AvailableForRead.compareAndSet(this, remaining, remaining - n)) return true
Sergey Mashkov86f70612017-07-25 10:59:44 +030032 }
33 }
34
35 fun tryReadAtMost(n: Int): Int {
Sergey Mashkovffc71342017-12-18 19:31:08 +030036 val AvailableForRead = AvailableForRead
Sergey Mashkov86f70612017-07-25 10:59:44 +030037 while (true) {
Roman Elizarov469cad32017-08-15 15:54:56 +030038 val remaining = availableForRead
Sergey Mashkov86f70612017-07-25 10:59:44 +030039 val delta = minOf(n, remaining)
40 if (delta == 0) return 0
Roman Elizarov469cad32017-08-15 15:54:56 +030041 if (AvailableForRead.compareAndSet(this, remaining, remaining - delta)) return delta
Sergey Mashkov86f70612017-07-25 10:59:44 +030042 }
43 }
44
Sergey Mashkov7bf4fa92017-11-17 18:39:03 +030045 fun tryWriteAtLeast(n: Int): Int {
Sergey Mashkovffc71342017-12-18 19:31:08 +030046 val AvailableForWrite = AvailableForWrite
Sergey Mashkov7bf4fa92017-11-17 18:39:03 +030047 while (true) {
48 val remaining = availableForWrite
49 if (remaining < n) return 0
50 if (AvailableForWrite.compareAndSet(this, remaining, 0)) return remaining
51 }
52 }
53
Sergey Mashkov86f70612017-07-25 10:59:44 +030054 fun tryWriteExact(n: Int): Boolean {
Sergey Mashkovffc71342017-12-18 19:31:08 +030055 val AvailableForWrite = AvailableForWrite
Sergey Mashkov86f70612017-07-25 10:59:44 +030056 while (true) {
Roman Elizarov469cad32017-08-15 15:54:56 +030057 val remaining = availableForWrite
Sergey Mashkov86f70612017-07-25 10:59:44 +030058 if (remaining < n) return false
Roman Elizarov469cad32017-08-15 15:54:56 +030059 if (AvailableForWrite.compareAndSet(this, remaining, remaining - n)) return true
Sergey Mashkov86f70612017-07-25 10:59:44 +030060 }
61 }
62
63 fun tryWriteAtMost(n: Int): Int {
Sergey Mashkovffc71342017-12-18 19:31:08 +030064 val AvailableForWrite = AvailableForWrite
65
Sergey Mashkov86f70612017-07-25 10:59:44 +030066 while (true) {
Roman Elizarov469cad32017-08-15 15:54:56 +030067 val remaining = availableForWrite
Sergey Mashkov86f70612017-07-25 10:59:44 +030068 val delta = minOf(n, remaining)
69 if (delta == 0) return 0
Roman Elizarov469cad32017-08-15 15:54:56 +030070 if (AvailableForWrite.compareAndSet(this, remaining, remaining - delta)) return delta
Sergey Mashkov86f70612017-07-25 10:59:44 +030071 }
72 }
73
74 fun completeRead(n: Int) {
Sergey Mashkovffc71342017-12-18 19:31:08 +030075 val totalCapacity = totalCapacity
76 val AvailableForWrite = AvailableForWrite
77
Sergey Mashkov86f70612017-07-25 10:59:44 +030078 while (true) {
Roman Elizarov469cad32017-08-15 15:54:56 +030079 val remaining = availableForWrite
80 val update = remaining + n
Sergey Mashkovffc71342017-12-18 19:31:08 +030081 if (update > totalCapacity) completeReadOverflow(remaining, update, n)
Roman Elizarov469cad32017-08-15 15:54:56 +030082 if (AvailableForWrite.compareAndSet(this, remaining, update)) break
Sergey Mashkov86f70612017-07-25 10:59:44 +030083 }
84 }
85
Sergey Mashkovffc71342017-12-18 19:31:08 +030086 private fun completeReadOverflow(remaining: Int, update: Int, n: Int): Nothing {
87 throw IllegalArgumentException("Completed read overflow: $remaining + $n = $update > $totalCapacity")
88 }
89
Sergey Mashkov86f70612017-07-25 10:59:44 +030090 fun completeWrite(n: Int) {
Sergey Mashkovffc71342017-12-18 19:31:08 +030091 val totalCapacity = totalCapacity
92 val PendingToFlush = PendingToFlush
93
Sergey Mashkov86f70612017-07-25 10:59:44 +030094 while (true) {
Roman Elizarov469cad32017-08-15 15:54:56 +030095 val pending = pendingToFlush
96 val update = pending + n
Sergey Mashkovffc71342017-12-18 19:31:08 +030097 if (update > totalCapacity) completeReadOverflow(pending, n)
Roman Elizarov469cad32017-08-15 15:54:56 +030098 if (PendingToFlush.compareAndSet(this, pending, update)) break
Sergey Mashkov86f70612017-07-25 10:59:44 +030099 }
100 }
101
Sergey Mashkovffc71342017-12-18 19:31:08 +0300102 private fun completeReadOverflow(pending: Int, n: Int): Nothing {
103 throw IllegalArgumentException("Complete write overflow: $pending + $n > $totalCapacity")
104 }
105
Sergey Mashkov86f70612017-07-25 10:59:44 +0300106 /**
107 * @return true if there are bytes available for read after flush
108 */
109 fun flush(): Boolean {
Sergey Mashkovffc71342017-12-18 19:31:08 +0300110 val AvailableForRead = AvailableForRead
Roman Elizarov469cad32017-08-15 15:54:56 +0300111 val pending = PendingToFlush.getAndSet(this, 0)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300112 while (true) {
Roman Elizarov469cad32017-08-15 15:54:56 +0300113 val remaining = availableForRead
114 val update = remaining + pending
115 if (remaining == update || AvailableForRead.compareAndSet(this, remaining, update)) {
116 return update > 0
Sergey Mashkov86f70612017-07-25 10:59:44 +0300117 }
118 }
119 }
120
121 fun tryLockForRelease(): Boolean {
Sergey Mashkovffc71342017-12-18 19:31:08 +0300122 val AvailableForWrite = AvailableForWrite
Sergey Mashkov86f70612017-07-25 10:59:44 +0300123 while (true) {
Roman Elizarov469cad32017-08-15 15:54:56 +0300124 val remaining = availableForWrite
125 if (pendingToFlush > 0 || availableForRead > 0 || remaining != totalCapacity) return false
126 if (AvailableForWrite.compareAndSet(this, remaining, 0)) return true
Sergey Mashkov86f70612017-07-25 10:59:44 +0300127 }
128 }
129
Sergey Mashkov98c87392017-11-20 17:58:28 +0300130 /**
131 * Make all writers to fail to write any more bytes
132 * Use only during failure termination
133 */
134 fun forceLockForRelease() {
135 AvailableForWrite.getAndSet(this, 0)
136 }
137
Roman Elizarov469cad32017-08-15 15:54:56 +0300138 fun isEmpty(): Boolean = availableForWrite == totalCapacity
139 fun isFull(): Boolean = availableForWrite == 0
Sergey Mashkov86f70612017-07-25 10:59:44 +0300140
141 companion object {
Roman Elizarov469cad32017-08-15 15:54:56 +0300142 // todo: replace with atomicfu, remove companion object
143 private val AvailableForRead = intUpdater(RingBufferCapacity::availableForRead)
144 private val AvailableForWrite = intUpdater(RingBufferCapacity::availableForWrite)
145 private val PendingToFlush = intUpdater(RingBufferCapacity::pendingToFlush)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300146 }
147}