blob: 9e46e249f0516c7280c542dca3082d2864cd2b49 [file] [log] [blame]
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03001/*
2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
Sergey Mashkov86fe5912017-07-25 11:20:01 +03005@file:Suppress("UsePropertyAccessSyntax") // for ByteBuffer.getShort/getInt/etc
6
Sergey Mashkov86f70612017-07-25 10:59:44 +03007package kotlinx.coroutines.experimental.io
8
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03009import kotlinx.atomicfu.*
Sergey Mashkov6d059932017-10-06 18:46:23 +030010import kotlinx.coroutines.experimental.*
11import kotlinx.coroutines.experimental.channels.*
Sergey Mashkov86f70612017-07-25 10:59:44 +030012import kotlinx.coroutines.experimental.io.internal.*
Sergey Mashkova10b6922017-08-03 13:41:54 +030013import kotlinx.coroutines.experimental.io.packet.*
Sergey Mashkov6d059932017-10-06 18:46:23 +030014import kotlinx.io.core.*
15import kotlinx.io.core.ByteReadPacket
16import kotlinx.io.pool.*
17import java.io.EOFException
18import java.nio.*
19import java.util.concurrent.atomic.*
Sergey Mashkov09b621e2017-12-22 20:42:18 +030020import kotlin.coroutines.experimental.*
21import kotlin.coroutines.experimental.intrinsics.*
Sergey Mashkov86f70612017-07-25 10:59:44 +030022
Roman Elizarov339ccf32017-09-27 17:44:00 +030023internal const val DEFAULT_CLOSE_MESSAGE = "Byte channel was closed"
24
Roman Elizarov469cad32017-08-15 15:54:56 +030025// implementation for ByteChannel
26internal class ByteBufferChannel(
Sergey Mashkov6d059932017-10-06 18:46:23 +030027 override val autoFlush: Boolean,
28 private val pool: ObjectPool<ReadWriteBufferState.Initial> = BufferObjectPool,
29 private val reservedSize: Int = RESERVED_SIZE
Sergey Mashkov4cd00142017-09-05 19:25:32 +030030) : ByteChannel, LookAheadSuspendSession {
Roman Elizarov469cad32017-08-15 15:54:56 +030031 // internal constructor for reading of byte buffers
Roman Elizarov83762032017-08-08 19:58:40 +030032 constructor(content: ByteBuffer) : this(false, BufferObjectNoPool, 0) {
Sergey Mashkov569b5422017-07-25 12:17:11 +030033 state = ReadWriteBufferState.Initial(content.slice(), 0).apply {
34 capacity.resetForRead()
35 }.startWriting()
36 restoreStateAfterWrite()
37 close()
38 tryTerminate()
39 }
Sergey Mashkov86f70612017-07-25 10:59:44 +030040
41 @Volatile
Sergey Mashkov86fe5912017-07-25 11:20:01 +030042 private var state: ReadWriteBufferState = ReadWriteBufferState.IdleEmpty
Sergey Mashkov86f70612017-07-25 10:59:44 +030043
44 @Volatile
Sergey Mashkov86fe5912017-07-25 11:20:01 +030045 private var closed: ClosedElement? = null
Sergey Mashkov86f70612017-07-25 10:59:44 +030046
47 @Volatile
Sergey Mashkovfd85fb92017-11-08 22:27:18 +030048 private var joining: JoiningState? = null
Sergey Mashkovc51ecd32017-10-27 22:42:39 +030049
50 @Volatile
Sergey Mashkov09b621e2017-12-22 20:42:18 +030051 private var readOp: Continuation<Boolean>? = null
Sergey Mashkov86f70612017-07-25 10:59:44 +030052
53 @Volatile
Sergey Mashkov09b621e2017-12-22 20:42:18 +030054 private var writeOp: Continuation<Unit>? = null
Sergey Mashkov86f70612017-07-25 10:59:44 +030055
56 private var readPosition = 0
57 private var writePosition = 0
58
Sergey Mashkova26b7902017-12-05 16:06:03 +030059 @Volatile
60 private var attachedJob: Job? = null
61
62 internal fun attachJob(job: Job) {
63 attachedJob?.cancel()
64 attachedJob = job
Sergey Mashkov09b621e2017-12-22 20:42:18 +030065 job.invokeOnCompletion(onCancelling = true) { cause ->
Sergey Mashkova26b7902017-12-05 16:06:03 +030066 attachedJob = null
Sergey Mashkov09b621e2017-12-22 20:42:18 +030067 if (cause != null) cancel(cause)
Sergey Mashkova26b7902017-12-05 16:06:03 +030068 }
69 }
70
Sergey Mashkov86f70612017-07-25 10:59:44 +030071 override var readByteOrder: ByteOrder = ByteOrder.BIG_ENDIAN
72 override var writeByteOrder: ByteOrder = ByteOrder.BIG_ENDIAN
Sergey Mashkov8af89372017-12-05 12:35:56 +030073 set(newOrder) {
74 if (field != newOrder) {
75 field = newOrder
76 joining?.delegatedTo?.writeByteOrder = newOrder
77 }
78 }
Sergey Mashkov86f70612017-07-25 10:59:44 +030079
Roman Elizarov469cad32017-08-15 15:54:56 +030080 override val availableForRead: Int
81 get() = state.capacity.availableForRead
Sergey Mashkov86f70612017-07-25 10:59:44 +030082
Roman Elizarov469cad32017-08-15 15:54:56 +030083 override val availableForWrite: Int
84 get() = state.capacity.availableForWrite
85
86 override val isClosedForRead: Boolean
Sergey Mashkovfd85fb92017-11-08 22:27:18 +030087 get() = state === ReadWriteBufferState.Terminated && closed != null
Sergey Mashkov86f70612017-07-25 10:59:44 +030088
Roman Elizarov469cad32017-08-15 15:54:56 +030089 override val isClosedForWrite: Boolean
90 get() = closed != null
91
Sergey Mashkovaae062d2017-09-07 15:12:50 +030092 @Volatile
93 override var totalBytesRead: Long = 0L
94 private set
95
96 @Volatile
97 override var totalBytesWritten: Long = 0L
98 private set
99
Sergey Mashkov8fbdff42018-02-02 17:17:55 +0300100 override val closedCause: Throwable?
101 get() = closed?.cause
102
Sergey Mashkov86f70612017-07-25 10:59:44 +0300103 override fun close(cause: Throwable?): Boolean {
104 if (closed != null) return false
Sergey Mashkov86f70612017-07-25 10:59:44 +0300105 val newClosed = if (cause == null) ClosedElement.EmptyCause else ClosedElement(cause)
Sergey Mashkovad68c062017-10-04 21:34:18 +0300106 state.capacity.flush()
Roman Elizarov469cad32017-08-15 15:54:56 +0300107 if (!Closed.compareAndSet(this, null, newClosed)) return false
Sergey Mashkov8e132412017-09-11 12:39:46 +0300108 state.capacity.flush()
Sergey Mashkov24b040f2017-08-25 11:06:59 +0300109 if (state.capacity.isEmpty() || cause != null) tryTerminate()
Sergey Mashkov86f70612017-07-25 10:59:44 +0300110 resumeClosed(cause)
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300111
112 if (state === ReadWriteBufferState.Terminated) {
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300113 joining?.let { ensureClosedJoined(it) }
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300114 }
115
Sergey Mashkova26b7902017-12-05 16:06:03 +0300116 if (cause != null) attachedJob?.cancel(cause)
Sergey Mashkov4d01f7e2018-01-12 18:46:22 +0300117// readSuspendContinuationCache.close()
118// writeSuspendContinuationCache.close()
Sergey Mashkova26b7902017-12-05 16:06:03 +0300119
Sergey Mashkov86f70612017-07-25 10:59:44 +0300120 return true
121 }
122
Sergey Mashkova26b7902017-12-05 16:06:03 +0300123 override fun cancel(cause: Throwable?): Boolean {
Sergey Mashkoveee3aaf2017-12-05 15:51:55 +0300124 return close(cause ?: CancellationException("Channel has been cancelled"))
125 }
126
Sergey Mashkove6764082017-12-05 12:27:09 +0300127 private fun flushImpl(minReadSize: Int, minWriteSize: Int) {
Sergey Mashkov33d5d132017-11-10 10:28:57 +0300128 joining?.delegatedTo?.flush()
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300129
Sergey Mashkov625c1fe2017-11-08 06:15:39 +0300130 val avw: Int
Sergey Mashkove6764082017-12-05 12:27:09 +0300131 val avr: Int
Sergey Mashkov8e9138c2017-11-09 21:18:05 +0300132
Sergey Mashkov625c1fe2017-11-08 06:15:39 +0300133 while (true) {
134 val s = state
Sergey Mashkov33d5d132017-11-10 10:28:57 +0300135 if (s === ReadWriteBufferState.Terminated) return
Sergey Mashkove6764082017-12-05 12:27:09 +0300136 s.capacity.flush()
Sergey Mashkov625c1fe2017-11-08 06:15:39 +0300137 if (s === state) {
138 avw = s.capacity.availableForWrite
Sergey Mashkove6764082017-12-05 12:27:09 +0300139 avr = s.capacity.availableForRead
Sergey Mashkov625c1fe2017-11-08 06:15:39 +0300140 break
141 }
142 }
143
Sergey Mashkove6764082017-12-05 12:27:09 +0300144 if (avr >= minReadSize) resumeReadOp()
Sergey Mashkovec2af2f2018-01-15 17:02:25 +0300145 val joining = joining
146 if (avw >= minWriteSize && (joining == null || state === ReadWriteBufferState.Terminated)) resumeWriteOp()
Sergey Mashkove6764082017-12-05 12:27:09 +0300147 }
Sergey Mashkov33d5d132017-11-10 10:28:57 +0300148
Sergey Mashkove6764082017-12-05 12:27:09 +0300149 override fun flush() {
150 flushImpl(1, 1)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300151 }
152
153 private fun ByteBuffer.prepareBuffer(order: ByteOrder, position: Int, available: Int) {
154 require(position >= 0)
155 require(available >= 0)
156
Roman Elizarov83762032017-08-08 19:58:40 +0300157 val bufferLimit = capacity() - reservedSize
Sergey Mashkov86f70612017-07-25 10:59:44 +0300158 val virtualLimit = position + available
159
Sergey Mashkovf774f372017-10-09 12:09:16 +0300160 order(order.nioOrder)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300161 limit(virtualLimit.coerceAtMost(bufferLimit))
162 position(position)
163 }
164
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300165 private fun setupStateForWrite(): ByteBuffer? {
Sergey Mashkov4d01f7e2018-01-12 18:46:22 +0300166 writeOp?.let { existing ->
167 throw IllegalStateException("Write operation is already in progress: $existing")
Sergey Mashkov09b621e2017-12-22 20:42:18 +0300168 }
Sergey Mashkov62deb432017-11-09 19:50:26 +0300169
Sergey Mashkov86f70612017-07-25 10:59:44 +0300170 var _allocated: ReadWriteBufferState.Initial? = null
Sergey Mashkovd7d9e2f2018-03-16 00:01:23 +0300171 var old: ReadWriteBufferState? = null
172
173 val newState = updateStateAndGet { state ->
174 old = state
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300175 when {
Sergey Mashkov98c87392017-11-20 17:58:28 +0300176 joining != null -> {
177 _allocated?.let { releaseBuffer(it) }
178 return null
179 }
180 closed != null -> {
181 _allocated?.let { releaseBuffer(it) }
182 throw closed!!.sendException
183 }
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300184 state === ReadWriteBufferState.IdleEmpty -> {
185 val allocated = _allocated ?: newBuffer().also { _allocated = it }
186 allocated.startWriting()
Sergey Mashkov86f70612017-07-25 10:59:44 +0300187 }
Sergey Mashkov98c87392017-11-20 17:58:28 +0300188 state === ReadWriteBufferState.Terminated -> {
189 _allocated?.let { releaseBuffer(it) }
190 if (joining != null) return null
191 throw closed!!.sendException
192 }
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300193 else -> state.startWriting()
Sergey Mashkov86f70612017-07-25 10:59:44 +0300194 }
195 }
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300196
Sergey Mashkov18b28a22017-11-20 10:53:32 +0300197 if (closed != null) {
198 restoreStateAfterWrite()
199 tryTerminate()
200
201 throw closed!!.sendException
202 }
203
Sergey Mashkov86f70612017-07-25 10:59:44 +0300204 val buffer = newState.writeBuffer
205
206 _allocated?.let { allocated ->
207 if (old !== ReadWriteBufferState.IdleEmpty) {
208 releaseBuffer(allocated)
209 }
210 }
Sergey Mashkov86f70612017-07-25 10:59:44 +0300211 return buffer.apply {
Roman Elizarov469cad32017-08-15 15:54:56 +0300212 prepareBuffer(writeByteOrder, writePosition, newState.capacity.availableForWrite)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300213 }
214 }
215
216 private fun restoreStateAfterWrite() {
217 var toRelease: ReadWriteBufferState.IdleNonEmpty? = null
218
Sergey Mashkovd7d9e2f2018-03-16 00:01:23 +0300219 val newState = updateStateAndGet {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300220 val writeStopped = it.stopWriting()
221 if (writeStopped is ReadWriteBufferState.IdleNonEmpty && writeStopped.capacity.isEmpty()) {
222 toRelease = writeStopped
223 ReadWriteBufferState.IdleEmpty
224 } else {
225 writeStopped
226 }
227 }
228
229 if (newState === ReadWriteBufferState.IdleEmpty) {
230 toRelease?.let { releaseBuffer(it.initial) }
231 }
232 }
233
234 private fun setupStateForRead(): ByteBuffer? {
Sergey Mashkov39a88542017-12-19 18:41:34 +0300235 val newState = updateStateAndGet { state ->
Sergey Mashkov86f70612017-07-25 10:59:44 +0300236 when (state) {
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300237 ReadWriteBufferState.Terminated -> closed?.cause?.let { throw it } ?: return null
Sergey Mashkov86f70612017-07-25 10:59:44 +0300238 ReadWriteBufferState.IdleEmpty -> closed?.cause?.let { throw it } ?: return null
239 else -> {
Roman Elizarov469cad32017-08-15 15:54:56 +0300240 if (state.capacity.availableForRead == 0) return null
Sergey Mashkov86f70612017-07-25 10:59:44 +0300241 state.startReading()
242 }
243 }
244 }
245
246 return newState.readBuffer.apply {
Roman Elizarov469cad32017-08-15 15:54:56 +0300247 prepareBuffer(readByteOrder, readPosition, newState.capacity.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300248 }
249 }
250
251 private fun restoreStateAfterRead() {
252 var toRelease: ReadWriteBufferState.IdleNonEmpty? = null
253
Sergey Mashkov39a88542017-12-19 18:41:34 +0300254 val newState = updateStateAndGet { state ->
Sergey Mashkov86f70612017-07-25 10:59:44 +0300255 toRelease?.let {
Roman Elizarov469cad32017-08-15 15:54:56 +0300256 it.capacity.resetForWrite()
Sergey Mashkov86f70612017-07-25 10:59:44 +0300257 resumeWriteOp()
258 toRelease = null
259 }
260
261 val readStopped = state.stopReading()
262
263 if (readStopped is ReadWriteBufferState.IdleNonEmpty) {
264 if (this.state === state && readStopped.capacity.tryLockForRelease()) {
265 toRelease = readStopped
266 ReadWriteBufferState.IdleEmpty
267 } else {
268 readStopped
269 }
270 } else {
271 readStopped
272 }
273 }
274
275 if (newState === ReadWriteBufferState.IdleEmpty) {
276 toRelease?.let { releaseBuffer(it.initial) }
277 resumeWriteOp()
Sergey Mashkovd1842c62018-01-23 23:16:54 +0300278 } else {
279 if (newState is ReadWriteBufferState.IdleNonEmpty && newState.capacity.isEmpty()) {
280 if (newState.capacity.tryLockForRelease() && State.compareAndSet(this, newState, ReadWriteBufferState.IdleEmpty)) {
281 newState.capacity.resetForWrite()
282 releaseBuffer(newState.initial)
283 resumeWriteOp()
284 }
285 }
Sergey Mashkov86f70612017-07-25 10:59:44 +0300286 }
287 }
288
Sergey Mashkov78317162018-03-21 10:12:40 +0300289 private fun setupDelegateTo(delegate: ByteBufferChannel, delegateClose: Boolean, delegateFlush: Boolean): JoiningState {
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300290 require(this !== delegate)
291
Sergey Mashkov78317162018-03-21 10:12:40 +0300292 val joined = JoiningState(delegate, delegateClose, delegateFlush)
Sergey Mashkov8af89372017-12-05 12:35:56 +0300293 delegate.writeByteOrder = writeByteOrder
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300294 this.joining = joined
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300295
296 val alreadyClosed = closed
297 if (alreadyClosed != null) {
298 if (alreadyClosed.cause != null) delegate.close(alreadyClosed.cause)
Sergey Mashkov9e9929d2017-11-14 14:52:32 +0300299 else if (delegateClose && state === ReadWriteBufferState.Terminated) delegate.close()
Sergey Mashkov78317162018-03-21 10:12:40 +0300300 else if (delegateFlush) delegate.flush()
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300301 } else {
302 flush()
303 }
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300304
305 return joined
306 }
307
308 private fun tryCompleteJoining(joined: JoiningState): Boolean {
Sergey Mashkovd1842c62018-01-23 23:16:54 +0300309 if (!tryReleaseBuffer(true)) return false
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300310 ensureClosedJoined(joined)
311
Sergey Mashkov060f36a2017-11-23 12:39:27 +0300312 resumeReadOp { IllegalStateException("Joining is in progress") }
Sergey Mashkov98c87392017-11-20 17:58:28 +0300313 resumeWriteOp() // here we don't resume it with exception because it should resume and delegate writing
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300314
315 return true
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300316 }
317
318 private fun tryTerminate(): Boolean {
Sergey Mashkov98c87392017-11-20 17:58:28 +0300319 if (closed == null) return false
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300320
Sergey Mashkovd1842c62018-01-23 23:16:54 +0300321 if (!tryReleaseBuffer(false)) return false
Sergey Mashkov98c87392017-11-20 17:58:28 +0300322
323 joining?.let { ensureClosedJoined(it) }
324
325 resumeReadOp()
326 resumeWriteOp()
327
328 return true
329 }
330
Sergey Mashkovd1842c62018-01-23 23:16:54 +0300331 private fun tryReleaseBuffer(forceTermination: Boolean): Boolean {
Sergey Mashkov24b040f2017-08-25 11:06:59 +0300332 var toRelease: ReadWriteBufferState.Initial? = null
333
Sergey Mashkovd7d9e2f2018-03-16 00:01:23 +0300334 updateStateAndGet { state ->
Sergey Mashkov98c87392017-11-20 17:58:28 +0300335 toRelease?.let { buffer ->
336 toRelease = null
337 buffer.capacity.resetForWrite()
338 resumeWriteOp()
339 }
340 val closed = closed
341
Roman Elizarov469cad32017-08-15 15:54:56 +0300342 when {
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300343 state === ReadWriteBufferState.Terminated -> return true
Roman Elizarov469cad32017-08-15 15:54:56 +0300344 state === ReadWriteBufferState.IdleEmpty -> ReadWriteBufferState.Terminated
Sergey Mashkov98c87392017-11-20 17:58:28 +0300345 closed != null && state is ReadWriteBufferState.IdleNonEmpty && (state.capacity.tryLockForRelease() || closed.cause != null) -> {
346 if (closed.cause != null) state.capacity.forceLockForRelease()
Sergey Mashkov24b040f2017-08-25 11:06:59 +0300347 toRelease = state.initial
348 ReadWriteBufferState.Terminated
349 }
Sergey Mashkovd1842c62018-01-23 23:16:54 +0300350 forceTermination && state is ReadWriteBufferState.IdleNonEmpty && state.capacity.tryLockForRelease() -> {
351 toRelease = state.initial
352 ReadWriteBufferState.Terminated
353 }
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300354 else -> return false
Sergey Mashkov86f70612017-07-25 10:59:44 +0300355 }
Roman Elizarov469cad32017-08-15 15:54:56 +0300356 }
Sergey Mashkov24b040f2017-08-25 11:06:59 +0300357
358 toRelease?.let { buffer ->
359 if (state === ReadWriteBufferState.Terminated) {
360 releaseBuffer(buffer)
361 }
362 }
363
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300364 return true
Sergey Mashkov86f70612017-07-25 10:59:44 +0300365 }
366
Roman Elizarov83762032017-08-08 19:58:40 +0300367 private fun ByteBuffer.carryIndex(idx: Int) = if (idx >= capacity() - reservedSize) idx - (capacity() - reservedSize) else idx
Sergey Mashkov86f70612017-07-25 10:59:44 +0300368
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300369 private inline fun writing(block: ByteBufferChannel.(ByteBuffer, RingBufferCapacity) -> Unit) {
Sergey Mashkov4fe8eec2017-11-17 13:33:56 +0300370 val current = joining?.let { resolveDelegation(this, it) } ?: this
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300371 val buffer = current.setupStateForWrite() ?: return
372 val capacity = current.state.capacity
Sergey Mashkov4d01f7e2018-01-12 18:46:22 +0300373 val before = current.totalBytesWritten
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300374
Sergey Mashkov86f70612017-07-25 10:59:44 +0300375 try {
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300376 current.closed?.let { throw it.sendException }
377 block(current, buffer, capacity)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300378 } finally {
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300379 if (capacity.isFull() || current.autoFlush) current.flush()
Sergey Mashkov4d01f7e2018-01-12 18:46:22 +0300380 if (current !== this) {
381 totalBytesWritten += current.totalBytesWritten - before
382 }
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300383 current.restoreStateAfterWrite()
384 current.tryTerminate()
Sergey Mashkov86f70612017-07-25 10:59:44 +0300385 }
386 }
387
388 private inline fun reading(block: ByteBuffer.(RingBufferCapacity) -> Boolean): Boolean {
389 val buffer = setupStateForRead() ?: return false
390 val capacity = state.capacity
Sergey Mashkov86f70612017-07-25 10:59:44 +0300391 try {
Roman Elizarov469cad32017-08-15 15:54:56 +0300392 if (capacity.availableForRead == 0) return false
Sergey Mashkov86f70612017-07-25 10:59:44 +0300393
394 return block(buffer, capacity)
395 } finally {
396 restoreStateAfterRead()
397 tryTerminate()
398 }
399 }
400
Sergey Mashkovdfae5e22017-12-18 19:31:27 +0300401 private fun readAsMuchAsPossible(dst: ByteBuffer): Int {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300402 var consumed = 0
403
Sergey Mashkovdfae5e22017-12-18 19:31:27 +0300404 reading { state ->
405 val buffer = this
406 val bufferLimit = buffer.capacity() - reservedSize
Sergey Mashkov0f7e9222017-10-10 13:00:38 +0300407
Sergey Mashkovdfae5e22017-12-18 19:31:27 +0300408 while (true) {
409 val dstRemaining = dst.remaining()
410 if (dstRemaining == 0) break
411
412 val position = readPosition
413 val bufferRemaining = bufferLimit - position
414
415 val part = state.tryReadAtMost(minOf(bufferRemaining, dstRemaining))
416 if (part == 0) break
417
418 buffer.limit(position + part)
419 buffer.position(position)
420 dst.put(buffer)
421
422 bytesRead(state, part)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300423 consumed += part
Sergey Mashkov86f70612017-07-25 10:59:44 +0300424 }
Sergey Mashkovdfae5e22017-12-18 19:31:27 +0300425
426 false
Sergey Mashkov86f70612017-07-25 10:59:44 +0300427 }
428
Sergey Mashkovdfae5e22017-12-18 19:31:27 +0300429 return consumed
Sergey Mashkov86f70612017-07-25 10:59:44 +0300430 }
431
Sergey Mashkovc4694cc2017-10-09 12:51:34 +0300432 private tailrec fun readAsMuchAsPossible(dst: BufferView, consumed0: Int = 0): Int {
433 var consumed = 0
434
435 val rc = reading {
436 val dstSize = dst.writeRemaining
437 val part = it.tryReadAtMost(minOf(remaining(), dstSize))
438 if (part > 0) {
439 consumed += part
440
441 if (dstSize < remaining()) {
442 limit(position() + dstSize)
443 }
444 dst.write(this)
445
446 bytesRead(it, part)
447 true
448 } else {
449 false
450 }
451 }
452
453 return if (rc && dst.canWrite() && state.capacity.availableForRead > 0)
454 readAsMuchAsPossible(dst, consumed0 + consumed)
455 else consumed + consumed0
456 }
457
458
Sergey Mashkovdfae5e22017-12-18 19:31:27 +0300459 private fun readAsMuchAsPossible(dst: ByteArray, offset: Int, length: Int): Int {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300460 var consumed = 0
461
Sergey Mashkovdfae5e22017-12-18 19:31:27 +0300462 reading { state ->
463 val buffer = this
464 val bufferLimit = buffer.capacity() - reservedSize
Sergey Mashkov86f70612017-07-25 10:59:44 +0300465
Sergey Mashkovdfae5e22017-12-18 19:31:27 +0300466 while (true) {
467 val lengthRemaining = length - consumed
468 if (lengthRemaining == 0) break
469 val position = readPosition
470 val bufferRemaining = bufferLimit - position
471
472 val part = state.tryReadAtMost(minOf(bufferRemaining, lengthRemaining))
473 if (part == 0) break
474
475 buffer.limit(position + part)
476 buffer.position(position)
477 buffer.get(dst, offset + consumed, part)
478
479 bytesRead(state, part)
480 consumed += part
Sergey Mashkov86f70612017-07-25 10:59:44 +0300481 }
Sergey Mashkovdfae5e22017-12-18 19:31:27 +0300482
483 false
Sergey Mashkov86f70612017-07-25 10:59:44 +0300484 }
485
Sergey Mashkovdfae5e22017-12-18 19:31:27 +0300486 return consumed
Sergey Mashkov86f70612017-07-25 10:59:44 +0300487 }
488
Sergey Mashkovd1842c62018-01-23 23:16:54 +0300489 final override suspend fun readFully(dst: ByteArray, offset: Int, length: Int) {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300490 val consumed = readAsMuchAsPossible(dst, offset, length)
491
492 if (consumed < length) {
Sergey Mashkovf75ec152017-11-14 14:11:46 +0300493 return readFullySuspend(dst, offset + consumed, length - consumed)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300494 }
495 }
496
Sergey Mashkovd1842c62018-01-23 23:16:54 +0300497 final override suspend fun readFully(dst: ByteBuffer): Int {
Sergey Mashkov6ca9a272017-07-25 20:10:27 +0300498 val rc = readAsMuchAsPossible(dst)
499 if (!dst.hasRemaining()) return rc
Sergey Mashkov86f70612017-07-25 10:59:44 +0300500
Sergey Mashkov6ca9a272017-07-25 20:10:27 +0300501 return readFullySuspend(dst, rc)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300502 }
503
Sergey Mashkov6ca9a272017-07-25 20:10:27 +0300504 private suspend fun readFullySuspend(dst: ByteBuffer, rc0: Int): Int {
505 var copied = rc0
506
Sergey Mashkov86f70612017-07-25 10:59:44 +0300507 while (dst.hasRemaining()) {
508 if (!readSuspend(1)) throw ClosedReceiveChannelException("Unexpected EOF: expected ${dst.remaining()} more bytes")
Sergey Mashkov6ca9a272017-07-25 20:10:27 +0300509 copied += readAsMuchAsPossible(dst)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300510 }
Sergey Mashkov6ca9a272017-07-25 20:10:27 +0300511
512 return copied
Sergey Mashkov86f70612017-07-25 10:59:44 +0300513 }
514
Sergey Mashkovd1842c62018-01-23 23:16:54 +0300515 private tailrec suspend fun readFullySuspend(dst: ByteArray, offset: Int, length: Int) {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300516 if (!readSuspend(1)) throw ClosedReceiveChannelException("Unexpected EOF: expected $length more bytes")
517
518 val consumed = readAsMuchAsPossible(dst, offset, length)
519
520 if (consumed < length) {
521 readFullySuspend(dst, offset + consumed, length - consumed)
522 }
523 }
524
Roman Elizarov469cad32017-08-15 15:54:56 +0300525 suspend override fun readAvailable(dst: ByteArray, offset: Int, length: Int): Int {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300526 val consumed = readAsMuchAsPossible(dst, offset, length)
527
Sergey Mashkovfde1a642017-10-10 14:23:34 +0300528 if (consumed == 0 && closed != null) {
Sergey Mashkov4eccfba2017-10-10 13:54:47 +0300529 if (state.capacity.flush()) {
530 return readAsMuchAsPossible(dst, offset, length)
531 } else {
Sergey Mashkovfde1a642017-10-10 14:23:34 +0300532 return -1
Sergey Mashkove4d3ce72017-09-11 10:31:08 +0300533 }
Sergey Mashkovb87d6362017-08-23 11:58:06 +0300534 }
Sergey Mashkovfde1a642017-10-10 14:23:34 +0300535 else if (consumed > 0 || length == 0) return consumed
536
537 return readAvailableSuspend(dst, offset, length)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300538 }
539
Roman Elizarov469cad32017-08-15 15:54:56 +0300540 suspend override fun readAvailable(dst: ByteBuffer): Int {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300541 val consumed = readAsMuchAsPossible(dst)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300542
Sergey Mashkovfde1a642017-10-10 14:23:34 +0300543 if (consumed == 0 && closed != null) {
Sergey Mashkov4eccfba2017-10-10 13:54:47 +0300544 if (state.capacity.flush()) {
545 return readAsMuchAsPossible(dst)
546 } else {
Sergey Mashkovfde1a642017-10-10 14:23:34 +0300547 return -1
Sergey Mashkove4d3ce72017-09-11 10:31:08 +0300548 }
Sergey Mashkovb87d6362017-08-23 11:58:06 +0300549 }
Sergey Mashkovfde1a642017-10-10 14:23:34 +0300550 else if (consumed > 0 || !dst.hasRemaining()) return consumed
551
552 return readAvailableSuspend(dst)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300553 }
554
Sergey Mashkovc4694cc2017-10-09 12:51:34 +0300555 suspend override fun readAvailable(dst: BufferView): Int {
556 val consumed = readAsMuchAsPossible(dst)
557
Sergey Mashkovfde1a642017-10-10 14:23:34 +0300558 if (consumed == 0 && closed != null) {
Sergey Mashkov4eccfba2017-10-10 13:54:47 +0300559 if (state.capacity.flush()) {
560 return readAsMuchAsPossible(dst)
561 } else {
Sergey Mashkovfde1a642017-10-10 14:23:34 +0300562 return -1
Sergey Mashkovc4694cc2017-10-09 12:51:34 +0300563 }
Sergey Mashkovc4694cc2017-10-09 12:51:34 +0300564 }
Sergey Mashkovfde1a642017-10-10 14:23:34 +0300565 else if (consumed > 0 || !dst.canWrite()) return consumed
566
567 return readAvailableSuspend(dst)
Sergey Mashkovc4694cc2017-10-09 12:51:34 +0300568 }
569
Sergey Mashkovb87d6362017-08-23 11:58:06 +0300570 private suspend fun readAvailableSuspend(dst: ByteArray, offset: Int, length: Int): Int {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300571 if (!readSuspend(1)) return -1
Roman Elizarov469cad32017-08-15 15:54:56 +0300572 return readAvailable(dst, offset, length)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300573 }
574
Sergey Mashkovb87d6362017-08-23 11:58:06 +0300575 private suspend fun readAvailableSuspend(dst: ByteBuffer): Int {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300576 if (!readSuspend(1)) return -1
Roman Elizarov469cad32017-08-15 15:54:56 +0300577 return readAvailable(dst)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300578 }
579
Sergey Mashkovc4694cc2017-10-09 12:51:34 +0300580 private suspend fun readAvailableSuspend(dst: BufferView): Int {
581 if (!readSuspend(1)) return -1
582 return readAvailable(dst)
583 }
584
Sergey Mashkovd1842c62018-01-23 23:16:54 +0300585 final override suspend fun readPacket(size: Int, headerSizeHint: Int): ByteReadPacket {
Sergey Mashkova10b6922017-08-03 13:41:54 +0300586 closed?.cause?.let { throw it }
587
Sergey Mashkov6d059932017-10-06 18:46:23 +0300588 if (size == 0) return ByteReadPacket.Empty
Sergey Mashkova10b6922017-08-03 13:41:54 +0300589
Sergey Mashkov6d059932017-10-06 18:46:23 +0300590 val builder = BytePacketBuilder(headerSizeHint)
Roman Elizarov83762032017-08-08 19:58:40 +0300591 val buffer = BufferPool.borrow()
Sergey Mashkov75101172017-09-25 19:41:08 +0300592 var remaining = size
Sergey Mashkova10b6922017-08-03 13:41:54 +0300593
594 try {
Sergey Mashkov75101172017-09-25 19:41:08 +0300595 while (remaining > 0) {
596 buffer.clear()
597 if (buffer.remaining() > remaining) {
598 buffer.limit(remaining)
599 }
600
601 val rc = readAsMuchAsPossible(buffer)
602 if (rc == 0) break
603
604 buffer.flip()
605 builder.writeFully(buffer)
606
607 remaining -= rc
608 }
609 } catch (t: Throwable) {
610 BufferPool.recycle(buffer)
611 builder.release()
612 throw t
613 }
614
615 return if (remaining == 0) {
616 BufferPool.recycle(buffer)
617 builder.build()
618 } else {
Sergey Mashkovf75ec152017-11-14 14:11:46 +0300619 return readPacketSuspend(remaining, builder, buffer)
Sergey Mashkov75101172017-09-25 19:41:08 +0300620 }
621 }
622
623 private suspend fun readPacketSuspend(size: Int, builder: ByteWritePacketImpl, buffer: ByteBuffer): ByteReadPacket {
624 var remaining = size
625
626 try {
Sergey Mashkova10b6922017-08-03 13:41:54 +0300627 while (remaining > 0) {
628 buffer.clear()
629 if (buffer.remaining() > remaining) {
630 buffer.limit(remaining)
631 }
632
633 val rc = readFully(buffer)
Sergey Mashkov75101172017-09-25 19:41:08 +0300634
Sergey Mashkova10b6922017-08-03 13:41:54 +0300635 buffer.flip()
636 builder.writeFully(buffer)
637
638 remaining -= rc
639 }
640
Sergey Mashkov75101172017-09-25 19:41:08 +0300641
Sergey Mashkova10b6922017-08-03 13:41:54 +0300642 return builder.build()
643 } catch (t: Throwable) {
644 builder.release()
645 throw t
646 } finally {
Roman Elizarov83762032017-08-08 19:58:40 +0300647 BufferPool.recycle(buffer)
Sergey Mashkova10b6922017-08-03 13:41:54 +0300648 }
649 }
650
Sergey Mashkovd1842c62018-01-23 23:16:54 +0300651 final override suspend fun readByte(): Byte {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300652 var b: Byte = 0
653
654 val rc = reading {
655 if (it.tryReadExact(1)) {
656 b = get()
Sergey Mashkov2cb1d5b2017-07-25 11:26:14 +0300657 bytesRead(it, 1)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300658 true
659 } else false
660 }
661
Sergey Mashkovf75ec152017-11-14 14:11:46 +0300662 if (rc) {
663 return b
Sergey Mashkov86f70612017-07-25 10:59:44 +0300664 } else {
Sergey Mashkovf75ec152017-11-14 14:11:46 +0300665 return readByteSuspend()
Sergey Mashkov86f70612017-07-25 10:59:44 +0300666 }
667 }
668
669 private suspend fun readByteSuspend(): Byte {
670 if (!readSuspend(1)) throw ClosedReceiveChannelException("EOF: one byte required")
671 return readByte()
672 }
673
Sergey Mashkovd1842c62018-01-23 23:16:54 +0300674 final override suspend fun readBoolean(): Boolean {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300675 var b = false
676
677 val rc = reading {
678 if (it.tryReadExact(1)) {
679 b = get() != 0.toByte()
Sergey Mashkov2cb1d5b2017-07-25 11:26:14 +0300680 bytesRead(it, 1)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300681 true
682 } else false
683 }
684
Sergey Mashkovf75ec152017-11-14 14:11:46 +0300685 if (rc) {
686 return b
Sergey Mashkov86f70612017-07-25 10:59:44 +0300687 } else {
Sergey Mashkovf75ec152017-11-14 14:11:46 +0300688 return readBooleanSuspend()
Sergey Mashkov86f70612017-07-25 10:59:44 +0300689 }
690 }
691
692 private suspend fun readBooleanSuspend(): Boolean {
693 if (!readSuspend(1)) throw ClosedReceiveChannelException("EOF: one byte required")
694 return readBoolean()
695 }
696
Sergey Mashkovd1842c62018-01-23 23:16:54 +0300697 final override suspend fun readShort(): Short {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300698 var sh: Short = 0
699
700 val rc = reading {
701 if (it.tryReadExact(2)) {
702 if (remaining() < 2) rollBytes(2)
703 sh = getShort()
Sergey Mashkov2cb1d5b2017-07-25 11:26:14 +0300704 bytesRead(it, 2)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300705 true
706 } else false
707 }
708
Sergey Mashkovf75ec152017-11-14 14:11:46 +0300709 if (rc) {
710 return sh
Sergey Mashkov86f70612017-07-25 10:59:44 +0300711 } else {
Sergey Mashkovf75ec152017-11-14 14:11:46 +0300712 return readShortSuspend()
Sergey Mashkov86f70612017-07-25 10:59:44 +0300713 }
714 }
715
716 private suspend fun readShortSuspend(): Short {
717 if (!readSuspend(2)) throw ClosedReceiveChannelException("EOF while byte expected")
718 return readShort()
719 }
720
Sergey Mashkov4d01f7e2018-01-12 18:46:22 +0300721 final override suspend fun readInt(): Int {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300722 var i = 0
723
724 val rc = reading {
725 if (it.tryReadExact(4)) {
726 if (remaining() < 4) rollBytes(4)
727 i = getInt()
Sergey Mashkov2cb1d5b2017-07-25 11:26:14 +0300728 bytesRead(it, 4)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300729 true
730 } else false
731 }
732
Sergey Mashkovf75ec152017-11-14 14:11:46 +0300733 if (rc) {
734 return i
Sergey Mashkov86f70612017-07-25 10:59:44 +0300735 } else {
Sergey Mashkovf75ec152017-11-14 14:11:46 +0300736 return readIntSuspend()
Sergey Mashkov86f70612017-07-25 10:59:44 +0300737 }
738 }
739
Sergey Mashkov86f70612017-07-25 10:59:44 +0300740 private suspend fun readIntSuspend(): Int {
741 if (!readSuspend(4)) throw ClosedReceiveChannelException("EOF while an int expected")
742 return readInt()
743 }
744
Sergey Mashkov4d01f7e2018-01-12 18:46:22 +0300745 final override suspend fun readLong(): Long {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300746 var i = 0L
747
748 val rc = reading {
749 if (it.tryReadExact(8)) {
750 if (remaining() < 8) rollBytes(8)
751 i = getLong()
Sergey Mashkov2cb1d5b2017-07-25 11:26:14 +0300752 bytesRead(it, 8)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300753 true
754 } else false
755 }
756
Sergey Mashkovf75ec152017-11-14 14:11:46 +0300757 if (rc) {
758 return i
Sergey Mashkov86f70612017-07-25 10:59:44 +0300759 } else {
Sergey Mashkovf75ec152017-11-14 14:11:46 +0300760 return readLongSuspend()
Sergey Mashkov86f70612017-07-25 10:59:44 +0300761 }
762 }
763
764 private suspend fun readLongSuspend(): Long {
765 if (!readSuspend(8)) throw ClosedReceiveChannelException("EOF while a long expected")
766 return readLong()
767 }
768
Sergey Mashkov4d01f7e2018-01-12 18:46:22 +0300769 final override suspend fun readDouble(): Double {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300770 var d = 0.0
771
772 val rc = reading {
773 if (it.tryReadExact(8)) {
774 if (remaining() < 8) rollBytes(8)
775 d = getDouble()
Sergey Mashkov2cb1d5b2017-07-25 11:26:14 +0300776 bytesRead(it, 8)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300777 true
778 } else false
779 }
780
Sergey Mashkovf75ec152017-11-14 14:11:46 +0300781 if (rc) {
782 return d
Sergey Mashkov86f70612017-07-25 10:59:44 +0300783 } else {
Sergey Mashkovf75ec152017-11-14 14:11:46 +0300784 return readDoubleSuspend()
Sergey Mashkov86f70612017-07-25 10:59:44 +0300785 }
786 }
787
788 private suspend fun readDoubleSuspend(): Double {
789 if (!readSuspend(8)) throw ClosedReceiveChannelException("EOF while a double expected")
790 return readDouble()
791 }
792
Sergey Mashkov4d01f7e2018-01-12 18:46:22 +0300793 final override suspend fun readFloat(): Float {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300794 var f = 0.0f
795
796 val rc = reading {
797 if (it.tryReadExact(4)) {
798 if (remaining() < 4) rollBytes(4)
799 f = getFloat()
Sergey Mashkov2cb1d5b2017-07-25 11:26:14 +0300800 bytesRead(it, 4)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300801 true
802 } else false
803 }
804
Sergey Mashkovf75ec152017-11-14 14:11:46 +0300805 if (rc) {
806 return f
Sergey Mashkov86f70612017-07-25 10:59:44 +0300807 } else {
Sergey Mashkovf75ec152017-11-14 14:11:46 +0300808 return readFloatSuspend()
Sergey Mashkov86f70612017-07-25 10:59:44 +0300809 }
810 }
811
812 private suspend fun readFloatSuspend(): Float {
813 if (!readSuspend(4)) throw ClosedReceiveChannelException("EOF while an int expected")
814 return readFloat()
815 }
816
Sergey Mashkov86fe5912017-07-25 11:20:01 +0300817 private fun ByteBuffer.rollBytes(n: Int) {
Sergey Mashkova432a822018-05-21 12:50:14 +0300818 val rem = remaining()
819
Sergey Mashkov86fe5912017-07-25 11:20:01 +0300820 limit(position() + n)
Sergey Mashkova432a822018-05-21 12:50:14 +0300821 for (i in 0 until n - rem) {
Sergey Mashkov86fe5912017-07-25 11:20:01 +0300822 put(capacity() + ReservedLongIndex + i, get(i))
823 }
824 }
825
Sergey Mashkov86f70612017-07-25 10:59:44 +0300826 private fun ByteBuffer.carry() {
Roman Elizarov83762032017-08-08 19:58:40 +0300827 val base = capacity() - reservedSize
Sergey Mashkov86f70612017-07-25 10:59:44 +0300828 for (i in base until position()) {
829 put(i - base, get(i))
830 }
831 }
832
833 private fun ByteBuffer.bytesWritten(c: RingBufferCapacity, n: Int) {
834 require(n >= 0)
835
836 writePosition = carryIndex(writePosition + n)
837 c.completeWrite(n)
Sergey Mashkovaae062d2017-09-07 15:12:50 +0300838 totalBytesWritten += n
Sergey Mashkov86f70612017-07-25 10:59:44 +0300839 }
840
Sergey Mashkov2cb1d5b2017-07-25 11:26:14 +0300841 private fun ByteBuffer.bytesRead(c: RingBufferCapacity, n: Int) {
842 require(n >= 0)
843
844 readPosition = carryIndex(readPosition + n)
845 c.completeRead(n)
Sergey Mashkovaae062d2017-09-07 15:12:50 +0300846 totalBytesRead += n
Sergey Mashkov2cb1d5b2017-07-25 11:26:14 +0300847 resumeWriteOp()
848 }
849
Sergey Mashkov2874d162017-11-17 13:56:46 +0300850 private tailrec fun resolveDelegation(current: ByteBufferChannel, joining: JoiningState): ByteBufferChannel? {
Sergey Mashkov4fe8eec2017-11-17 13:33:56 +0300851 if (current.state === ReadWriteBufferState.Terminated) {
852 val joinedTo = joining.delegatedTo
853 val nextJoining = joinedTo.joining ?: return joinedTo
854 return resolveDelegation(joinedTo, nextJoining)
855 }
856
Sergey Mashkov2874d162017-11-17 13:56:46 +0300857 return null
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300858 }
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300859
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300860 private suspend fun delegateByte(b: Byte) {
861 val joined = joining!!
862 if (state === ReadWriteBufferState.Terminated) return joined.delegatedTo.writeByte(b)
863 return delegateSuspend(joined) { writeByte(b) }
864 }
865
866 private suspend fun delegateSuspend(joined: JoiningState, block: suspend ByteBufferChannel.() -> Unit) {
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300867 while (true) {
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300868 if (state === ReadWriteBufferState.Terminated) return block(joined.delegatedTo)
869 writeSuspend(1)
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300870 }
871 }
872
Sergey Mashkov86f70612017-07-25 10:59:44 +0300873 suspend override fun writeByte(b: Byte) {
Sergey Mashkov2874d162017-11-17 13:56:46 +0300874 joining?.let { resolveDelegation(this, it)?.let { return it.writeByte(b) } }
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300875
876 val buffer = setupStateForWrite() ?: return delegateByte(b)
Sergey Mashkovfde1a642017-10-10 14:23:34 +0300877 val c = state.capacity
878
879 return tryWriteByte(buffer, b, c)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300880 }
881
Sergey Mashkovfde1a642017-10-10 14:23:34 +0300882 private suspend fun tryWriteByte(buffer: ByteBuffer, b: Byte, c: RingBufferCapacity) {
883 if (!c.tryWriteExact(1)) {
884 return writeByteSuspend(buffer, b, c)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300885 }
Sergey Mashkovfde1a642017-10-10 14:23:34 +0300886
887 doWrite(buffer, b, c)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300888 }
889
Sergey Mashkovfde1a642017-10-10 14:23:34 +0300890 private fun doWrite(buffer: ByteBuffer, b: Byte, c: RingBufferCapacity) {
891 buffer.put(b)
892 buffer.bytesWritten(c, 1)
893 if (c.isFull() || autoFlush) flush()
894 restoreStateAfterWrite()
895 }
896
897 private suspend fun writeByteSuspend(buffer: ByteBuffer, b: Byte, c: RingBufferCapacity) {
898 try {
899 writeSuspend(1)
900 } catch (t: Throwable) {
901 restoreStateAfterWrite()
902 tryTerminate()
903 throw t
904 }
905
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300906 if (joining != null) {
907 restoreStateAfterWrite()
908 return delegateByte(b)
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300909 }
910
Sergey Mashkovbab9f792017-11-07 19:42:15 +0300911 buffer.prepareBuffer(writeByteOrder, writePosition, c.availableForWrite)
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300912 return tryWriteByte(buffer, b, c)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300913 }
914
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300915 private suspend fun delegateShort(s: Short) {
916 val joined = joining!!
917 if (state === ReadWriteBufferState.Terminated) return joined.delegatedTo.writeShort(s)
918 return delegateSuspend(joined) { writeShort(s) }
919 }
920
Sergey Mashkov86f70612017-07-25 10:59:44 +0300921 suspend override fun writeShort(s: Short) {
Sergey Mashkov2874d162017-11-17 13:56:46 +0300922 joining?.let { resolveDelegation(this, it)?.let { return it.writeShort(s) } }
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300923
924 val buffer = setupStateForWrite() ?: return delegateShort(s)
Sergey Mashkovd0cb0b12017-10-10 14:44:39 +0300925 val c = state.capacity
926
927 return tryWriteShort(buffer, s, c)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300928 }
929
Sergey Mashkovd0cb0b12017-10-10 14:44:39 +0300930 private fun doWrite(buffer: ByteBuffer, s: Short, c: RingBufferCapacity) {
931 buffer.apply {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300932 if (remaining() < 2) {
933 limit(capacity())
934 putShort(s)
935 carry()
936 } else {
937 putShort(s)
938 }
939
940 bytesWritten(c, 2)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300941 }
942
Sergey Mashkovd0cb0b12017-10-10 14:44:39 +0300943 if (c.isFull() || autoFlush) flush()
944 restoreStateAfterWrite()
945 }
946
947 private suspend fun tryWriteShort(buffer: ByteBuffer, s: Short, c: RingBufferCapacity) {
948 if (!c.tryWriteExact(2)) {
949 return writeShortSuspend(buffer, s, c)
950 }
951
952 return doWrite(buffer, s, c)
953 }
954
955 private suspend fun writeShortSuspend(buffer: ByteBuffer, s: Short, c: RingBufferCapacity) {
956 try {
957 writeSuspend(2)
958 } catch (t: Throwable) {
959 restoreStateAfterWrite()
960 tryTerminate()
961 throw t
962 }
963
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300964 if (joining != null) {
965 restoreStateAfterWrite()
966 return delegateShort(s)
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300967 }
968
Sergey Mashkovbab9f792017-11-07 19:42:15 +0300969 buffer.prepareBuffer(writeByteOrder, writePosition, c.availableForWrite)
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300970 return tryWriteShort(buffer, s, c)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300971 }
972
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300973 private suspend fun delegateInt(i: Int) {
974 val joined = joining!!
975 if (state === ReadWriteBufferState.Terminated) return joined.delegatedTo.writeInt(i)
976 return delegateSuspend(joined) { writeInt(i) }
977 }
978
Sergey Mashkov86f70612017-07-25 10:59:44 +0300979 private fun ByteBuffer.tryWriteInt(i: Int, c: RingBufferCapacity): Boolean {
980 if (c.tryWriteExact(4)) {
981 if (remaining() < 4) {
982 limit(capacity())
983 putInt(i)
984 carry()
985 } else {
986 putInt(i)
987 }
988
989 bytesWritten(c, 4)
Sergey Mashkovd0cb0b12017-10-10 14:44:39 +0300990 if (c.isFull() || autoFlush) flush()
991 restoreStateAfterWrite()
992 tryTerminate()
Sergey Mashkov86f70612017-07-25 10:59:44 +0300993 return true
994 }
995
996 return false
997 }
998
999 suspend override fun writeInt(i: Int) {
Sergey Mashkovf75ec152017-11-14 14:11:46 +03001000 val buffer = setupStateForWrite()
1001 if (buffer == null) {
Sergey Mashkov4fe8eec2017-11-17 13:33:56 +03001002 val delegation = resolveDelegation(this, joining!!)
Sergey Mashkov2874d162017-11-17 13:56:46 +03001003 @Suppress("SuspiciousEqualsCombination")
1004 if (delegation != null && delegation !== this) return delegation.writeInt(i)
Sergey Mashkovf75ec152017-11-14 14:11:46 +03001005 else return delegateSuspend(joining!!, { writeInt(i) })
Sergey Mashkov86f70612017-07-25 10:59:44 +03001006 }
Sergey Mashkovf75ec152017-11-14 14:11:46 +03001007 val c = state.capacity
1008//
1009 if (buffer.tryWriteInt(i, c)) {
1010 return
1011 }
1012 return buffer.writeIntSuspend(i, c)
Sergey Mashkov86f70612017-07-25 10:59:44 +03001013 }
1014
Sergey Mashkovd0cb0b12017-10-10 14:44:39 +03001015 private tailrec suspend fun ByteBuffer.writeIntSuspend(i: Int, c: RingBufferCapacity) {
1016 try {
1017 writeSuspend(4)
1018 } catch (t: Throwable) {
1019 restoreStateAfterWrite()
1020 tryTerminate()
1021 throw t
1022 }
1023
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001024 if (joining != null) {
1025 restoreStateAfterWrite()
1026 return delegateInt(i)
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001027 }
1028
Sergey Mashkovbab9f792017-11-07 19:42:15 +03001029 prepareBuffer(writeByteOrder, writePosition, c.availableForWrite)
Sergey Mashkovd0cb0b12017-10-10 14:44:39 +03001030 if (!tryWriteInt(i, c)) {
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001031 return writeIntSuspend(i, c)
Sergey Mashkovd0cb0b12017-10-10 14:44:39 +03001032 }
Sergey Mashkov86f70612017-07-25 10:59:44 +03001033 }
1034
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001035 private suspend fun delegateLong(l: Long) {
1036 val joined = joining!!
1037 if (state === ReadWriteBufferState.Terminated) return joined.delegatedTo.writeLong(l)
1038 return delegateSuspend(joined) { writeLong(l) }
1039 }
1040
Sergey Mashkov86f70612017-07-25 10:59:44 +03001041 private fun ByteBuffer.tryWriteLong(l: Long, c: RingBufferCapacity): Boolean {
1042 if (c.tryWriteExact(8)) {
1043 if (remaining() < 8) {
1044 limit(capacity())
1045 putLong(l)
1046 carry()
1047 } else {
1048 putLong(l)
1049 }
1050
1051 bytesWritten(c, 8)
Sergey Mashkov8e9138c2017-11-09 21:18:05 +03001052 if (c.isFull() || autoFlush || joining != null) flush()
Sergey Mashkovd0cb0b12017-10-10 14:44:39 +03001053 restoreStateAfterWrite()
1054 tryTerminate()
Sergey Mashkov86f70612017-07-25 10:59:44 +03001055 return true
1056 }
1057
1058 return false
1059 }
1060
1061 suspend override fun writeLong(l: Long) {
Sergey Mashkov2874d162017-11-17 13:56:46 +03001062 joining?.let { resolveDelegation(this, it)?.let { return it.writeLong(l) } }
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001063
1064 val buffer = setupStateForWrite() ?: return delegateLong(l)
Sergey Mashkovd0cb0b12017-10-10 14:44:39 +03001065 val c = state.capacity
1066
1067 if (!buffer.tryWriteLong(l, c)) {
1068 return buffer.writeLongSuspend(l, c)
Sergey Mashkov86f70612017-07-25 10:59:44 +03001069 }
1070 }
1071
Sergey Mashkovd0cb0b12017-10-10 14:44:39 +03001072 private tailrec suspend fun ByteBuffer.writeLongSuspend(l: Long, c: RingBufferCapacity) {
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001073 try {
1074 writeSuspend(8)
1075 } catch (t: Throwable) {
1076 restoreStateAfterWrite()
1077 tryTerminate()
1078 throw t
1079 }
1080
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001081 if (joining != null) {
1082 restoreStateAfterWrite()
1083 return delegateLong(l)
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001084 }
Sergey Mashkovbab9f792017-11-07 19:42:15 +03001085
1086 prepareBuffer(writeByteOrder, writePosition, c.availableForWrite)
Sergey Mashkovd0cb0b12017-10-10 14:44:39 +03001087 if (!tryWriteLong(l, c)) {
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001088 return writeLongSuspend(l, c)
Sergey Mashkovd0cb0b12017-10-10 14:44:39 +03001089 }
Sergey Mashkov86f70612017-07-25 10:59:44 +03001090 }
1091
1092 suspend override fun writeDouble(d: Double) {
Sergey Mashkovf75ec152017-11-14 14:11:46 +03001093 return writeLong(java.lang.Double.doubleToRawLongBits(d))
Sergey Mashkov86f70612017-07-25 10:59:44 +03001094 }
1095
1096 suspend override fun writeFloat(f: Float) {
Sergey Mashkovf75ec152017-11-14 14:11:46 +03001097 return writeInt(java.lang.Float.floatToRawIntBits(f))
Sergey Mashkov86f70612017-07-25 10:59:44 +03001098 }
1099
Roman Elizarov469cad32017-08-15 15:54:56 +03001100 suspend override fun writeAvailable(src: ByteBuffer): Int {
Sergey Mashkov2874d162017-11-17 13:56:46 +03001101 joining?.let { resolveDelegation(this, it)?.let { return it.writeAvailable(src) } }
Sergey Mashkov324611b2017-11-10 19:00:58 +03001102
Sergey Mashkov86f70612017-07-25 10:59:44 +03001103 val copied = writeAsMuchAsPossible(src)
1104 if (copied > 0) return copied
1105
Sergey Mashkov2874d162017-11-17 13:56:46 +03001106 joining?.let { resolveDelegation(this, it)?.let { return it.writeAvailableSuspend(src) } }
Sergey Mashkov4fe8eec2017-11-17 13:33:56 +03001107 return writeAvailableSuspend(src)
Sergey Mashkov86f70612017-07-25 10:59:44 +03001108 }
1109
Sergey Mashkovc4694cc2017-10-09 12:51:34 +03001110 suspend override fun writeAvailable(src: BufferView): Int {
Sergey Mashkov2874d162017-11-17 13:56:46 +03001111 joining?.let { resolveDelegation(this, it)?.let { return it.writeAvailable(src) } }
Sergey Mashkov4fe8eec2017-11-17 13:33:56 +03001112
Sergey Mashkovc4694cc2017-10-09 12:51:34 +03001113 val copied = writeAsMuchAsPossible(src)
1114 if (copied > 0) return copied
1115
Sergey Mashkov2874d162017-11-17 13:56:46 +03001116 joining?.let { resolveDelegation(this, it)?.let { return it.writeAvailableSuspend(src) } }
Sergey Mashkov4fe8eec2017-11-17 13:33:56 +03001117 return writeAvailableSuspend(src)
Sergey Mashkovc4694cc2017-10-09 12:51:34 +03001118 }
1119
1120 private suspend fun writeAvailableSuspend(src: ByteBuffer): Int {
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001121 writeSuspend(1) // here we don't need to restoreStateAfterWrite as write copy loop doesn't hold state
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001122
Sergey Mashkov2874d162017-11-17 13:56:46 +03001123 joining?.let { resolveDelegation(this, it)?.let { return it .writeAvailableSuspend(src) } }
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001124
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001125 return writeAvailable(src)
Sergey Mashkovc4694cc2017-10-09 12:51:34 +03001126 }
1127
1128 private suspend fun writeAvailableSuspend(src: BufferView): Int {
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001129 writeSuspend(1)
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001130
Sergey Mashkov2874d162017-11-17 13:56:46 +03001131 joining?.let { resolveDelegation(this, it)?.let { return it.writeAvailableSuspend(src) } }
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001132
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001133 return writeAvailable(src)
Sergey Mashkov86f70612017-07-25 10:59:44 +03001134 }
1135
1136 suspend override fun writeFully(src: ByteBuffer) {
Sergey Mashkovd42bdad2017-11-17 19:29:49 +03001137 joining?.let { resolveDelegation(this, it)?.let { return it.writeFully(src) } }
Sergey Mashkov324611b2017-11-10 19:00:58 +03001138
Sergey Mashkov86f70612017-07-25 10:59:44 +03001139 writeAsMuchAsPossible(src)
1140 if (!src.hasRemaining()) return
1141
1142 return writeFullySuspend(src)
1143 }
1144
Sergey Mashkovc4694cc2017-10-09 12:51:34 +03001145 suspend override fun writeFully(src: BufferView) {
1146 writeAsMuchAsPossible(src)
1147 if (!src.canRead()) return
1148
1149 return writeFullySuspend(src)
1150 }
1151
Sergey Mashkov86f70612017-07-25 10:59:44 +03001152 private suspend fun writeFullySuspend(src: ByteBuffer) {
1153 while (src.hasRemaining()) {
Sergey Mashkov09b621e2017-12-22 20:42:18 +03001154 tryWriteSuspend(1)
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001155
Sergey Mashkov2874d162017-11-17 13:56:46 +03001156 joining?.let { resolveDelegation(this, it)?.let { return it.writeFully(src) } }
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001157
Sergey Mashkov86f70612017-07-25 10:59:44 +03001158 writeAsMuchAsPossible(src)
1159 }
1160 }
1161
Sergey Mashkovc4694cc2017-10-09 12:51:34 +03001162 private suspend fun writeFullySuspend(src: BufferView) {
1163 while (src.canRead()) {
Sergey Mashkov09b621e2017-12-22 20:42:18 +03001164 tryWriteSuspend(1)
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001165
Sergey Mashkov2874d162017-11-17 13:56:46 +03001166 joining?.let { resolveDelegation(this, it)?.let { return it.writeFully(src) } }
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001167
Sergey Mashkovc4694cc2017-10-09 12:51:34 +03001168 writeAsMuchAsPossible(src)
1169 }
1170 }
1171
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001172 private suspend fun awaitClose() {
Sergey Mashkovd489d3f2017-11-08 07:15:01 +03001173 if (closed != null) return
Sergey Mashkov373d5902017-11-10 12:22:23 +03001174 val joined = joining
1175
1176 if (joined != null) {
Sergey Mashkovf75ec152017-11-14 14:11:46 +03001177 return joined.awaitClose()
Sergey Mashkov373d5902017-11-10 12:22:23 +03001178 } else if (closed == null) {
1179 error("Only works for joined")
1180 }
Sergey Mashkovd489d3f2017-11-08 07:15:01 +03001181 }
1182
Sergey Mashkov78317162018-03-21 10:12:40 +03001183 internal suspend fun joinFrom(src: ByteBufferChannel, delegateClose: Boolean, delegateFlush: Boolean) {
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001184 if (src.closed != null && src.state === ReadWriteBufferState.Terminated) {
Sergey Mashkovca8762b2017-11-08 06:11:53 +03001185 if (delegateClose) close(src.closed!!.cause)
1186 return
1187 }
Sergey Mashkov9e9929d2017-11-14 14:52:32 +03001188 closed?.let { closed ->
1189 if (src.closed == null) throw closed.sendException
1190 return
1191 }
Sergey Mashkovca8762b2017-11-08 06:11:53 +03001192
Sergey Mashkov78317162018-03-21 10:12:40 +03001193 val joined = src.setupDelegateTo(this, delegateClose, delegateFlush)
Sergey Mashkov9e9929d2017-11-14 14:52:32 +03001194 if (src.tryCompleteJoining(joined)) {
1195 return src.awaitClose()
1196 }
1197
1198 return joinFromSuspend(src, delegateClose, joined)
Sergey Mashkovf75ec152017-11-14 14:11:46 +03001199 }
1200
Sergey Mashkov9e9929d2017-11-14 14:52:32 +03001201 private suspend fun joinFromSuspend(src: ByteBufferChannel, delegateClose: Boolean, joined: JoiningState) {
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001202 copyDirect(src, Long.MAX_VALUE, joined)
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001203
1204 if (delegateClose && src.isClosedForRead) {
1205 close()
1206 } else {
Sergey Mashkov78317162018-03-21 10:12:40 +03001207 if (joined.delegateFlush) {
1208 flush()
1209 }
Sergey Mashkovd489d3f2017-11-08 07:15:01 +03001210 src.awaitClose()
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001211 }
1212 }
1213
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001214 internal suspend fun copyDirect(src: ByteBufferChannel, limit: Long, joined: JoiningState?): Long {
1215 if (limit == 0L) return 0L
1216 if (src.isClosedForRead) {
1217 if (joined != null) {
Sergey Mashkov324611b2017-11-10 19:00:58 +03001218 check(src.tryCompleteJoining(joined))
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001219 }
1220 return 0L
1221 }
Sergey Mashkov324611b2017-11-10 19:00:58 +03001222 if (joined != null && src.tryCompleteJoining(joined)) {
1223 return 0L
1224 }
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001225
Sergey Mashkov88951ac2017-10-27 22:36:08 +03001226 val autoFlush = autoFlush
1227 val byteOrder = writeByteOrder
1228
1229 try {
1230 var copied = 0L
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001231 while (copied < limit) {
1232 writing { dstBuffer, state ->
1233 while (copied < limit) {
1234 var avWBefore = state.availableForWrite
1235 if (avWBefore == 0) {
Sergey Mashkov09b621e2017-12-22 20:42:18 +03001236 tryWriteSuspend(1)
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001237 if (joining != null) break
1238 avWBefore = state.availableForWrite
1239 }
Sergey Mashkov88951ac2017-10-27 22:36:08 +03001240
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001241 dstBuffer.prepareBuffer(byteOrder, writePosition, avWBefore)
Sergey Mashkov88951ac2017-10-27 22:36:08 +03001242
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001243 var partSize = 0
Sergey Mashkov88951ac2017-10-27 22:36:08 +03001244
Sergey Mashkovec2af2f2018-01-15 17:02:25 +03001245 src.reading { srcState ->
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001246 val srcBuffer = this
Sergey Mashkov88951ac2017-10-27 22:36:08 +03001247
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001248 val rem = minOf(srcBuffer.remaining().toLong(), dstBuffer.remaining().toLong(), limit - copied).toInt()
1249 val n = state.tryWriteAtMost(rem)
1250 if (n > 0) {
1251 if (!srcState.tryReadExact(n)) throw AssertionError()
Sergey Mashkov88951ac2017-10-27 22:36:08 +03001252
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001253 srcBuffer.limit(srcBuffer.position() + n)
Sergey Mashkov88951ac2017-10-27 22:36:08 +03001254
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001255 dstBuffer.put(srcBuffer)
1256 partSize = n
Sergey Mashkov88951ac2017-10-27 22:36:08 +03001257
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001258 with(src) {
1259 srcBuffer.bytesRead(srcState, n)
1260 }
Sergey Mashkov88951ac2017-10-27 22:36:08 +03001261 }
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001262
1263 true
Sergey Mashkov88951ac2017-10-27 22:36:08 +03001264 }
1265
Sergey Mashkovec2af2f2018-01-15 17:02:25 +03001266 if (partSize > 0) {
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001267 dstBuffer.bytesWritten(state, partSize)
1268 copied += partSize
Sergey Mashkov88951ac2017-10-27 22:36:08 +03001269
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001270 if (avWBefore - partSize == 0 || autoFlush) {
1271 flush()
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001272 }
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001273 } else {
1274 break
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001275 }
Sergey Mashkov88951ac2017-10-27 22:36:08 +03001276 }
1277 }
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001278
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001279 if (joined != null) {
1280 if (src.tryCompleteJoining(joined)) break
1281 else if (src.state.capacity.flush()) { // force flush src to read-up all the bytes
1282 src.resumeWriteOp()
1283 continue
1284 }
1285 }
1286
Sergey Mashkov373d5902017-11-10 12:22:23 +03001287 if (copied >= limit) break
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001288
Sergey Mashkov324611b2017-11-10 19:00:58 +03001289 flush()
1290
Sergey Mashkovd1842c62018-01-23 23:16:54 +03001291 if (src.availableForRead == 0) {
1292 if (src.readSuspendImpl(1)) {
1293 if (joined != null && src.tryCompleteJoining(joined)) break
1294 } else if (joined == null || src.tryCompleteJoining(joined)) break
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001295 }
Sergey Mashkov4d01f7e2018-01-12 18:46:22 +03001296
1297 if (joining != null) {
Sergey Mashkovec2af2f2018-01-15 17:02:25 +03001298 tryWriteSuspend(1)
Sergey Mashkov4d01f7e2018-01-12 18:46:22 +03001299 }
Sergey Mashkov88951ac2017-10-27 22:36:08 +03001300 }
1301
1302 if (autoFlush) {
1303 flush()
1304 }
1305
1306 return copied
1307 } catch (t: Throwable) {
1308 close(t)
1309 throw t
1310 }
1311 }
1312
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001313 private fun ensureClosedJoined(joined: JoiningState) {
1314 val closed = closed ?: return
1315 this.joining = null
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001316
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001317 if (joined.delegateClose) {
Sergey Mashkov4d01f7e2018-01-12 18:46:22 +03001318 // writing state could be if we are inside of copyDirect loop
1319 // so in this case we shouldn't close channel
1320 // otherwise few bytes could be lost
1321 // it will be closed later in copyDirect's finalization
1322 // so we only do flush
1323 val writing = joined.delegatedTo.state.let { it is ReadWriteBufferState.Writing || it is ReadWriteBufferState.ReadingWriting }
1324 if (closed.cause != null || !writing) {
1325 joined.delegatedTo.close(closed.cause)
Sergey Mashkov78317162018-03-21 10:12:40 +03001326 } else if (joined.delegateFlush) {
Sergey Mashkov4d01f7e2018-01-12 18:46:22 +03001327 joined.delegatedTo.flush()
1328 }
Sergey Mashkov78317162018-03-21 10:12:40 +03001329 } else if (joined.delegateFlush) {
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001330 joined.delegatedTo.flush()
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001331 }
1332
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001333 joined.complete()
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001334 }
1335
Sergey Mashkov86f70612017-07-25 10:59:44 +03001336 private fun writeAsMuchAsPossible(src: ByteBuffer): Int {
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001337 writing { dst, state ->
Sergey Mashkov86f70612017-07-25 10:59:44 +03001338 var written = 0
Sergey Mashkov0f7e9222017-10-10 13:00:38 +03001339 val srcLimit = src.limit()
Sergey Mashkov86f70612017-07-25 10:59:44 +03001340
1341 do {
Sergey Mashkov0f7e9222017-10-10 13:00:38 +03001342 val srcRemaining = srcLimit - src.position()
1343 if (srcRemaining == 0) break
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001344 val possibleSize = state.tryWriteAtMost(minOf(srcRemaining, dst.remaining()))
Sergey Mashkov86f70612017-07-25 10:59:44 +03001345 if (possibleSize == 0) break
1346 require(possibleSize > 0)
1347
Sergey Mashkov0f7e9222017-10-10 13:00:38 +03001348 src.limit(src.position() + possibleSize)
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001349 dst.put(src)
Sergey Mashkov86f70612017-07-25 10:59:44 +03001350
1351 written += possibleSize
1352
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001353 dst.prepareBuffer(writeByteOrder, dst.carryIndex(writePosition + written), state.availableForWrite)
Sergey Mashkov86f70612017-07-25 10:59:44 +03001354 } while (true)
1355
Sergey Mashkov0f7e9222017-10-10 13:00:38 +03001356 src.limit(srcLimit)
1357
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001358 dst.bytesWritten(state, written)
Sergey Mashkov86f70612017-07-25 10:59:44 +03001359
1360 return written
1361 }
1362
1363 return 0
1364 }
1365
Sergey Mashkovc4694cc2017-10-09 12:51:34 +03001366 private fun writeAsMuchAsPossible(src: BufferView): Int {
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001367 writing { dst, state ->
Sergey Mashkovc4694cc2017-10-09 12:51:34 +03001368 var written = 0
1369
1370 do {
1371 val srcSize = src.readRemaining
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001372 val possibleSize = state.tryWriteAtMost(minOf(srcSize, dst.remaining()))
Sergey Mashkovc4694cc2017-10-09 12:51:34 +03001373 if (possibleSize == 0) break
1374
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001375 src.read(dst, possibleSize)
Sergey Mashkovc4694cc2017-10-09 12:51:34 +03001376
1377 written += possibleSize
1378
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001379 dst.prepareBuffer(writeByteOrder, dst.carryIndex(writePosition + written), state.availableForWrite)
Sergey Mashkovc4694cc2017-10-09 12:51:34 +03001380 } while (true)
1381
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001382 dst.bytesWritten(state, written)
Sergey Mashkovc4694cc2017-10-09 12:51:34 +03001383
1384 return written
1385 }
1386
1387 return 0
1388 }
1389
Sergey Mashkov86f70612017-07-25 10:59:44 +03001390 private fun writeAsMuchAsPossible(src: ByteArray, offset: Int, length: Int): Int {
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001391 writing { dst, state ->
Sergey Mashkov86f70612017-07-25 10:59:44 +03001392 var written = 0
1393
1394 do {
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001395 val possibleSize = state.tryWriteAtMost(minOf(length - written, dst.remaining()))
Sergey Mashkov86f70612017-07-25 10:59:44 +03001396 if (possibleSize == 0) break
1397 require(possibleSize > 0)
1398
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001399 dst.put(src, offset + written, possibleSize)
Sergey Mashkov86f70612017-07-25 10:59:44 +03001400 written += possibleSize
1401
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001402 dst.prepareBuffer(writeByteOrder, dst.carryIndex(writePosition + written), state.availableForWrite)
Sergey Mashkov86f70612017-07-25 10:59:44 +03001403 } while (true)
1404
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001405 dst.bytesWritten(state, written)
Sergey Mashkov86f70612017-07-25 10:59:44 +03001406
1407 return written
1408 }
1409
1410 return 0
1411 }
1412
1413 suspend override fun writeFully(src: ByteArray, offset: Int, length: Int) {
Sergey Mashkov2874d162017-11-17 13:56:46 +03001414 joining?.let { resolveDelegation(this, it)?.let { return it .writeFully(src, offset, length) } }
Sergey Mashkov324611b2017-11-10 19:00:58 +03001415
Sergey Mashkov86f70612017-07-25 10:59:44 +03001416 var rem = length
1417 var off = offset
1418
1419 while (rem > 0) {
1420 val s = writeAsMuchAsPossible(src, off, rem)
1421 if (s == 0) break
1422
1423 off += s
1424 rem -= s
1425 }
1426
1427 if (rem == 0) return
1428
Sergey Mashkovf75ec152017-11-14 14:11:46 +03001429 return writeFullySuspend(src, off, rem)
Sergey Mashkov86f70612017-07-25 10:59:44 +03001430 }
1431
1432 private tailrec suspend fun writeFullySuspend(src: ByteArray, offset: Int, length: Int) {
1433 if (length == 0) return
Sergey Mashkov7c99a992018-05-29 21:34:37 +03001434 writeSuspendUnit(src, offset, length)
Sergey Mashkov9faa0392018-06-13 19:49:45 +03001435 val copied = writeSuspendResult.takeIf { it != -1 } ?:
1436 joining?.let { resolveDelegation(this, it) }?.writeSuspendResult?.takeIf { it != -1 } ?:
1437 error("-1 should be only set in case of joining")
1438
Sergey Mashkov86f70612017-07-25 10:59:44 +03001439 return writeFullySuspend(src, offset + copied, length - copied)
1440 }
1441
Roman Elizarov469cad32017-08-15 15:54:56 +03001442 suspend override fun writeAvailable(src: ByteArray, offset: Int, length: Int): Int {
Sergey Mashkov2874d162017-11-17 13:56:46 +03001443 joining?.let { resolveDelegation(this, it)?.let { return it.writeAvailable(src, offset, length) } }
Sergey Mashkov324611b2017-11-10 19:00:58 +03001444
Sergey Mashkov86f70612017-07-25 10:59:44 +03001445 val size = writeAsMuchAsPossible(src, offset, length)
1446 if (size > 0) return size
1447 return writeSuspend(src, offset, length)
1448 }
1449
Sergey Mashkov7c99a992018-05-29 21:34:37 +03001450 private var writeSuspendResult: Int = 0
1451
1452 private suspend fun writeSuspendUnit(src: ByteArray, offset: Int, length: Int) {
1453 while (true) {
1454 tryWriteSuspend(1)
1455
Sergey Mashkov9faa0392018-06-13 19:49:45 +03001456 joining?.let {
1457 resolveDelegation(this, it)?.let {
1458 writeSuspendResult = -1;
1459 return it.writeSuspendUnit(src, offset, length)
1460 }
1461 }
Sergey Mashkov7c99a992018-05-29 21:34:37 +03001462
1463 val size = writeAsMuchAsPossible(src, offset, length)
1464 if (size > 0) {
1465 writeSuspendResult = size
1466 return
1467 }
1468 }
1469 }
1470
Sergey Mashkov86f70612017-07-25 10:59:44 +03001471 private suspend fun writeSuspend(src: ByteArray, offset: Int, length: Int): Int {
1472 while (true) {
Sergey Mashkov09b621e2017-12-22 20:42:18 +03001473 tryWriteSuspend(1)
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001474
Sergey Mashkov2874d162017-11-17 13:56:46 +03001475 joining?.let { resolveDelegation(this, it)?.let { return it.writeSuspend(src, offset, length) } }
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001476
Sergey Mashkov86f70612017-07-25 10:59:44 +03001477 val size = writeAsMuchAsPossible(src, offset, length)
1478 if (size > 0) return size
1479 }
1480 }
1481
Sergey Mashkov75101172017-09-25 19:41:08 +03001482 override suspend fun write(min: Int, block: (ByteBuffer) -> Unit) {
1483 require(min > 0) { "min should be positive"}
Sergey Mashkov56061332017-08-25 12:37:53 +03001484
Sergey Mashkov75101172017-09-25 19:41:08 +03001485 var written = false
Sergey Mashkov56061332017-08-25 12:37:53 +03001486
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001487 writing { dst, state ->
Sergey Mashkov7bf4fa92017-11-17 18:39:03 +03001488 val locked = state.tryWriteAtLeast(min)
1489
1490 if (locked > 0) {
1491 // here we have locked all remaining for write bytes
1492 // however we don't know how many bytes will be actually written
1493 // so later we have to return (locked - actuallyWritten) bytes back
1494
1495 // it is important to lock bytes to fail concurrent tryLockForRelease
1496 // once we have locked some bytes, tryLockForRelease will fail so it is safe to use buffer
1497
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001498 val position = dst.position()
1499 val l = dst.limit()
1500 block(dst)
1501 if (l != dst.limit()) throw IllegalStateException("buffer limit modified")
Sergey Mashkov56061332017-08-25 12:37:53 +03001502
Sergey Mashkov7bf4fa92017-11-17 18:39:03 +03001503 val actuallyWritten = dst.position() - position
1504 if (actuallyWritten < 0) throw IllegalStateException("position has been moved backward: pushback is not supported")
1505
1506 dst.bytesWritten(state, actuallyWritten)
1507
1508 if (actuallyWritten < locked) {
1509 state.completeRead(locked - actuallyWritten) // return back extra bytes (see note above)
1510 // we use completeRead in spite of that it is write block
1511 // we don't need to resume write as we are already in writing block
1512 }
1513
Sergey Mashkov75101172017-09-25 19:41:08 +03001514 written = true
Sergey Mashkov56061332017-08-25 12:37:53 +03001515 }
Sergey Mashkov75101172017-09-25 19:41:08 +03001516 }
Sergey Mashkov56061332017-08-25 12:37:53 +03001517
Sergey Mashkov75101172017-09-25 19:41:08 +03001518 if (!written) {
1519 return writeBlockSuspend(min, block)
1520 }
1521 }
1522
1523 private suspend fun writeBlockSuspend(min: Int, block: (ByteBuffer) -> Unit) {
1524 writeSuspend(min)
Sergey Mashkov2874d162017-11-17 13:56:46 +03001525 joining?.let { resolveDelegation(this, it)?.let { return it.write(min, block) } }
Sergey Mashkov4fe8eec2017-11-17 13:33:56 +03001526 return write(min, block)
Sergey Mashkov75101172017-09-25 19:41:08 +03001527 }
1528
Sergey Mashkovdfae5e22017-12-18 19:31:27 +03001529 override suspend fun writeWhile(block: (ByteBuffer) -> Boolean) {
1530 if (!writeWhileNoSuspend(block)) return
1531 closed?.let { throw it.sendException }
1532 return writeWhileSuspend(block)
1533 }
1534
1535 private fun writeWhileNoSuspend(block: (ByteBuffer) -> Boolean): Boolean {
1536 var continueWriting = true
1537
1538 writing { dst, capacity ->
1539 continueWriting = writeWhileLoop(dst, capacity, block)
1540 }
1541
1542 return continueWriting
1543 }
1544
1545 private suspend fun writeWhileSuspend(block: (ByteBuffer) -> Boolean) {
1546 var continueWriting = true
1547
1548 writing { dst, capacity ->
1549 while (true) {
1550 writeSuspend(1)
1551 if (joining != null) break
1552 if (!writeWhileLoop(dst, capacity, block)) {
1553 continueWriting = false
1554 break
1555 }
1556 if (closed != null) break
1557 }
1558 }
1559
1560 if (!continueWriting) return
1561 closed?.let { throw it.sendException }
1562 joining?.let { return writeWhile(block) }
1563 }
1564
1565 // it should be writing state set to use this function
1566 private fun writeWhileLoop(dst: ByteBuffer, capacity: RingBufferCapacity, block: (ByteBuffer) -> Boolean): Boolean {
1567 var continueWriting = true
1568 val bufferLimit = dst.capacity() - reservedSize
1569
1570 while (continueWriting) {
1571 val locked = capacity.tryWriteAtLeast(1) // see comments in [write]
1572 if (locked == 0) break
1573
1574 val position = writePosition
1575 val l = (position + locked).coerceAtMost(bufferLimit)
1576 dst.limit(l)
1577 dst.position(position)
1578
1579 continueWriting = try {
1580 block(dst)
1581 } catch (t: Throwable) {
1582 capacity.completeRead(locked)
1583 throw t
1584 }
1585
1586 if (dst.limit() != l) throw IllegalStateException("buffer limit modified")
1587 val actuallyWritten = dst.position() - position
1588 if (actuallyWritten < 0) throw IllegalStateException("position has been moved backward: pushback is not supported")
1589
1590 dst.bytesWritten(capacity, actuallyWritten)
1591 if (actuallyWritten < locked) {
1592 capacity.completeRead(locked - actuallyWritten) // return back extra bytes
1593 // it is important to use completeRead in spite of that we are writing here
1594 // no need to resume read here
1595 }
1596 }
1597
1598 return continueWriting
1599 }
1600
Sergey Mashkov75101172017-09-25 19:41:08 +03001601 override suspend fun read(min: Int, block: (ByteBuffer) -> Unit) {
Sergey Mashkove86eb082017-12-04 18:45:05 +03001602 require(min >= 0) { "min should be positive or zero" }
Sergey Mashkov75101172017-09-25 19:41:08 +03001603
1604 val read = reading {
Sergey Mashkove86eb082017-12-04 18:45:05 +03001605 val av = it.availableForRead
1606 if (av > 0 && av >= min) {
Sergey Mashkov75101172017-09-25 19:41:08 +03001607 val position = this.position()
1608 val l = this.limit()
1609 block(this)
1610 if (l != this.limit()) throw IllegalStateException("buffer limit modified")
1611 val delta = position() - position
1612 if (delta < 0) throw IllegalStateException("position has been moved backward: pushback is not supported")
1613
1614 if (!it.tryReadExact(delta)) throw IllegalStateException()
1615 bytesRead(it, delta)
1616 true
1617 }
1618 else false
1619 }
1620
1621 if (!read) {
Sergey Mashkov350d1442017-12-05 15:25:05 +03001622 if (isClosedForRead) return
Sergey Mashkovc143ea62017-10-10 15:21:35 +03001623 return readBlockSuspend(min, block)
Sergey Mashkov75101172017-09-25 19:41:08 +03001624 }
1625 }
1626
Sergey Mashkov4df720f2017-12-05 16:45:55 +03001627 override suspend fun discard(max: Long): Long {
1628 require(max >= 0) { "max shouldn't be negative: $max" }
1629
1630 var discarded = 0L
1631
1632 reading {
1633 val n = it.tryReadAtMost(minOf(Int.MAX_VALUE.toLong(), max).toInt())
1634 bytesRead(it, n)
1635 discarded += n
1636 true
1637 }
1638
1639 if (discarded == max || isClosedForRead) return discarded
1640
1641 return discardSuspend(discarded, max)
1642 }
1643
1644 private suspend fun discardSuspend(discarded0: Long, max: Long): Long {
1645 var discarded = discarded0
1646
1647 while (discarded < max) {
1648 val rc = reading {
1649 val n = it.tryReadAtMost(minOf(Int.MAX_VALUE.toLong(), max - discarded).toInt())
1650 bytesRead(it, n)
1651 discarded += n
1652
1653 true
1654 }
1655
1656 if (!rc) {
1657 if (isClosedForRead || !readSuspend(1)) break
1658 }
1659 }
1660
1661 return discarded
1662 }
1663
Sergey Mashkov75101172017-09-25 19:41:08 +03001664 private suspend fun readBlockSuspend(min: Int, block: (ByteBuffer) -> Unit) {
Sergey Mashkove86eb082017-12-04 18:45:05 +03001665 if (!readSuspend(min.coerceAtLeast(1))) {
1666 if (min > 0)
1667 throw EOFException("Got EOF but at least $min bytes were expected")
1668 else return
1669 }
1670
Sergey Mashkov75101172017-09-25 19:41:08 +03001671 read(min, block)
1672 }
1673
Sergey Mashkovd1842c62018-01-23 23:16:54 +03001674 override suspend fun writePacket(packet: ByteReadPacket) {
Sergey Mashkov2874d162017-11-17 13:56:46 +03001675 joining?.let { resolveDelegation(this, it)?.let { return it.writePacket(packet) } }
Sergey Mashkov324611b2017-11-10 19:00:58 +03001676
Sergey Mashkov75101172017-09-25 19:41:08 +03001677 try {
Sergey Mashkov6d059932017-10-06 18:46:23 +03001678 while (!packet.isEmpty) {
1679 if (tryWritePacketPart(packet) == 0) break
Sergey Mashkov75101172017-09-25 19:41:08 +03001680 }
1681 } catch (t: Throwable) {
Sergey Mashkov75101172017-09-25 19:41:08 +03001682 packet.release()
1683 throw t
1684 }
1685
Sergey Mashkov6d059932017-10-06 18:46:23 +03001686 if (packet.remaining > 0) {
Sergey Mashkov2874d162017-11-17 13:56:46 +03001687 joining?.let { resolveDelegation(this, it)?.let { return it.writePacket(packet) } }
Sergey Mashkov4fe8eec2017-11-17 13:33:56 +03001688 return writePacketSuspend(packet)
Sergey Mashkov56061332017-08-25 12:37:53 +03001689 }
Sergey Mashkov56061332017-08-25 12:37:53 +03001690 }
1691
Sergey Mashkov6d059932017-10-06 18:46:23 +03001692 private suspend fun writePacketSuspend(packet: ByteReadPacket) {
Sergey Mashkov56061332017-08-25 12:37:53 +03001693 try {
Sergey Mashkov6d059932017-10-06 18:46:23 +03001694 while (!packet.isEmpty) {
1695 writeSuspend(1)
Sergey Mashkov4fe8eec2017-11-17 13:33:56 +03001696
Sergey Mashkov2874d162017-11-17 13:56:46 +03001697 joining?.let { resolveDelegation(this, it)?.let { return it.writePacket(packet) } }
Sergey Mashkov6d059932017-10-06 18:46:23 +03001698 tryWritePacketPart(packet)
1699 }
Sergey Mashkov56061332017-08-25 12:37:53 +03001700 } finally {
Sergey Mashkov56061332017-08-25 12:37:53 +03001701 packet.release()
Sergey Mashkova10b6922017-08-03 13:41:54 +03001702 }
1703 }
1704
Sergey Mashkov6d059932017-10-06 18:46:23 +03001705 private fun tryWritePacketPart(packet: ByteReadPacket): Int {
1706 var copied = 0
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001707 writing { dst, state ->
1708 val size = state.tryWriteAtMost(minOf(packet.remaining, dst.remaining()))
Sergey Mashkov6d059932017-10-06 18:46:23 +03001709 if (size > 0) {
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001710 dst.limit(dst.position() + size)
1711 packet.readFully(dst)
1712 dst.bytesWritten(state, size)
Sergey Mashkov6d059932017-10-06 18:46:23 +03001713 }
1714 copied = size
1715 }
1716
1717 return copied
1718 }
1719
Sergey Mashkov86f70612017-07-25 10:59:44 +03001720 /**
1721 * Invokes [visitor] for every available batch until all bytes processed or visitor if visitor returns false.
1722 * Never invokes [visitor] with empty buffer unless [last] = true. Invokes visitor with last = true at most once
1723 * even if there are remaining bytes and visitor returned true.
1724 */
Sergey Mashkovf75ec152017-11-14 14:11:46 +03001725 final override suspend fun consumeEachBufferRange(visitor: (buffer: ByteBuffer, last: Boolean) -> Boolean) {
Sergey Mashkov4cd00142017-09-05 19:25:32 +03001726 if (consumeEachBufferRangeFast(false, visitor)) return
Sergey Mashkovf75ec152017-11-14 14:11:46 +03001727 return consumeEachBufferRangeSuspend(visitor)
Sergey Mashkov86f70612017-07-25 10:59:44 +03001728 }
1729
Sergey Mashkov4cd00142017-09-05 19:25:32 +03001730 override fun <R> lookAhead(visitor: LookAheadSession.() -> R): R {
1731 if (state === ReadWriteBufferState.Terminated) {
1732 return visitor(TerminatedLookAhead)
1733 }
1734
1735 var result: R? = null
1736 val rc = reading {
1737 result = visitor(this@ByteBufferChannel)
1738 true
1739 }
1740
1741 if (!rc) {
1742 return visitor(TerminatedLookAhead)
1743 }
1744
1745 return result!!
1746 }
1747
1748 suspend override fun <R> lookAheadSuspend(visitor: suspend LookAheadSuspendSession.() -> R): R {
1749 if (state === ReadWriteBufferState.Terminated) {
1750 return visitor(TerminatedLookAhead)
1751 }
1752
1753 var result: R? = null
1754 val rc = reading {
1755 result = visitor(this@ByteBufferChannel)
1756 true
1757 }
1758
1759 if (!rc) {
1760 if (closed != null || state === ReadWriteBufferState.Terminated) return visitor(TerminatedLookAhead)
1761 result = visitor(this)
Sergey Mashkov8a30b452017-09-07 13:15:31 +03001762 if (!state.idle && state !== ReadWriteBufferState.Terminated) {
Sergey Mashkov4cd00142017-09-05 19:25:32 +03001763 restoreStateAfterRead()
1764 tryTerminate()
1765 }
1766 }
1767
1768 return result!!
1769 }
1770
Sergey Mashkov09b621e2017-12-22 20:42:18 +03001771 override suspend fun writeSuspendSession(visitor: suspend WriterSuspendSession.() -> Unit) {
Sergey Mashkove15eae82018-06-01 17:23:47 +03001772 var locked = 0
Sergey Mashkov09b621e2017-12-22 20:42:18 +03001773
Sergey Mashkove15eae82018-06-01 17:23:47 +03001774 var current = joining?.let { resolveDelegation(this, it) } ?: this
1775 var byteBuffer = current.setupStateForWrite() ?: return writeSuspendSession(visitor)
1776 var ringBufferCapacity = current.state.capacity
Sergey Mashkov09b621e2017-12-22 20:42:18 +03001777
Sergey Mashkove15eae82018-06-01 17:23:47 +03001778 val session = object : WriterSuspendSession {
1779 override fun request(min: Int): ByteBuffer? {
1780 locked += ringBufferCapacity.tryWriteAtLeast(0)
1781 if (locked < min) return null
1782 byteBuffer.prepareBuffer(writeByteOrder, writePosition, locked)
1783 if (byteBuffer.remaining() < min) return null
1784 if (current.joining != null) return null
Sergey Mashkov09b621e2017-12-22 20:42:18 +03001785
Sergey Mashkove15eae82018-06-01 17:23:47 +03001786 return byteBuffer
Sergey Mashkov09b621e2017-12-22 20:42:18 +03001787 }
1788
Sergey Mashkove15eae82018-06-01 17:23:47 +03001789 override fun written(n: Int) {
1790 require(n >= 0)
1791 if (n > locked) throw IllegalStateException()
1792 locked -= n
1793 byteBuffer.bytesWritten(ringBufferCapacity, n)
1794 }
1795
1796 override suspend fun tryAwait(n: Int) {
1797 val joining = current.joining
1798 if (joining != null) {
1799 return tryAwaitJoinSwitch(n, joining)
1800 }
1801
1802 if (locked >= n) return
Sergey Mashkov09b621e2017-12-22 20:42:18 +03001803 if (locked > 0) {
1804 ringBufferCapacity.completeRead(locked)
Sergey Mashkovd1842c62018-01-23 23:16:54 +03001805 locked = 0
Sergey Mashkov09b621e2017-12-22 20:42:18 +03001806 }
Sergey Mashkove15eae82018-06-01 17:23:47 +03001807
1808 return tryWriteSuspend(n)
Sergey Mashkov09b621e2017-12-22 20:42:18 +03001809 }
Sergey Mashkove15eae82018-06-01 17:23:47 +03001810
1811 private suspend fun tryAwaitJoinSwitch(n: Int, joining: JoiningState) {
1812 if (locked > 0) {
1813 ringBufferCapacity.completeRead(locked)
1814 locked = 0
1815 }
1816 flush()
1817 restoreStateAfterWrite()
1818 tryTerminate()
1819
1820 do {
1821 current.tryWriteSuspend(n)
1822 current = resolveDelegation(current, joining) ?: continue
1823 byteBuffer = current.setupStateForWrite() ?: continue
1824 ringBufferCapacity = current.state.capacity
1825 } while (false)
1826 }
1827 }
1828
1829 try {
1830 visitor(session)
1831 } finally {
1832 if (locked > 0) {
1833 ringBufferCapacity.completeRead(locked)
1834 locked = 0
1835 }
1836
1837 current.restoreStateAfterWrite()
1838 current.tryTerminate()
Sergey Mashkov09b621e2017-12-22 20:42:18 +03001839 }
1840 }
1841
Sergey Mashkov4cd00142017-09-05 19:25:32 +03001842 override fun consumed(n: Int) {
1843 require(n >= 0)
1844
1845 state.let { s ->
1846 if (!s.capacity.tryReadExact(n)) throw IllegalStateException("Unable to consume $n bytes: not enough available bytes")
1847 s.readBuffer.bytesRead(s.capacity, n)
1848 }
1849 }
1850
Sergey Mashkov09b621e2017-12-22 20:42:18 +03001851 final override suspend fun awaitAtLeast(n: Int): Boolean {
Sergey Mashkovf75ec152017-11-14 14:11:46 +03001852 if (state.capacity.availableForRead >= n) {
Sergey Mashkov09b621e2017-12-22 20:42:18 +03001853 if (state.idle || state is ReadWriteBufferState.Writing) setupStateForRead()
1854 return true
Sergey Mashkovf75ec152017-11-14 14:11:46 +03001855 }
1856
Sergey Mashkov09b621e2017-12-22 20:42:18 +03001857 if (state.idle || state is ReadWriteBufferState.Writing) return awaitAtLeastSuspend(n)
1858 else if (n == 1) return readSuspendImpl(1)
1859 else return readSuspend(n)
Sergey Mashkovf75ec152017-11-14 14:11:46 +03001860 }
1861
Sergey Mashkov09b621e2017-12-22 20:42:18 +03001862 private suspend fun awaitAtLeastSuspend(n: Int): Boolean {
1863 val rc = readSuspend(n)
1864 if (rc && state.idle) {
Sergey Mashkov4cd00142017-09-05 19:25:32 +03001865 setupStateForRead()
1866 }
Sergey Mashkov09b621e2017-12-22 20:42:18 +03001867 return rc
Sergey Mashkov4cd00142017-09-05 19:25:32 +03001868 }
1869
1870 override fun request(skip: Int, atLeast: Int): ByteBuffer? {
1871 return state.let { s ->
1872 val available = s.capacity.availableForRead
1873 val rp = readPosition
1874
1875 if (available < atLeast + skip) return null
1876 if (s.idle || (s !is ReadWriteBufferState.Reading && s !is ReadWriteBufferState.ReadingWriting)) return null
1877
1878 val buffer = s.readBuffer
1879
1880 val position = buffer.carryIndex(rp + skip)
1881 buffer.prepareBuffer(readByteOrder, position, available - skip)
1882
1883 if (buffer.remaining() >= atLeast) buffer else null
1884 }
1885 }
1886
1887 private inline fun consumeEachBufferRangeFast(last: Boolean, visitor: (buffer: ByteBuffer, last: Boolean) -> Boolean): Boolean {
Sergey Mashkov86f70612017-07-25 10:59:44 +03001888 val rc = reading {
Sergey Mashkov86f70612017-07-25 10:59:44 +03001889 do {
Sergey Mashkov09b621e2017-12-22 20:42:18 +03001890 if (hasRemaining() || last) {
1891 val rc = visitor(this, last)
1892 afterBufferVisited(this, it)
1893 if (!rc || (last && !hasRemaining())) return true
1894 } else break
1895 } while (true)
Sergey Mashkov86f70612017-07-25 10:59:44 +03001896
1897 last
1898 }
1899
1900 if (!rc && closed != null) {
Roman Elizarov83762032017-08-08 19:58:40 +03001901 visitor(EmptyByteBuffer, true)
Sergey Mashkov09b621e2017-12-22 20:42:18 +03001902 return true
Sergey Mashkov86f70612017-07-25 10:59:44 +03001903 }
1904
1905 return rc
1906 }
1907
Sergey Mashkov09b621e2017-12-22 20:42:18 +03001908// private suspend fun consumeEachBufferRangeSuspendLoop(visitor: RendezvousChannel<ConsumeEachBufferVisitor>) {
1909// var last = false
1910//
1911// do {
1912// if (consumeEachBufferRangeFast(last, visitor)) return
1913// if (last) return
1914// if (!readSuspend(1)) {
1915// last = true
1916// }
1917// } while (true)
1918// }
1919
Sergey Mashkovf75ec152017-11-14 14:11:46 +03001920 private suspend fun consumeEachBufferRangeSuspend(visitor: (buffer: ByteBuffer, last: Boolean) -> Boolean) {
Sergey Mashkov86f70612017-07-25 10:59:44 +03001921 var last = false
1922
1923 do {
Sergey Mashkovf75ec152017-11-14 14:11:46 +03001924 if (consumeEachBufferRangeFast(last, visitor)) return
1925 if (last) return
Sergey Mashkov86f70612017-07-25 10:59:44 +03001926 if (!readSuspend(1)) {
1927 last = true
1928 }
1929 } while (true)
1930 }
1931
1932 private fun afterBufferVisited(buffer: ByteBuffer, c: RingBufferCapacity): Int {
1933 val consumed = buffer.position() - readPosition
1934 if (consumed > 0) {
Sergey Mashkov86fe5912017-07-25 11:20:01 +03001935 if (!c.tryReadExact(consumed)) throw IllegalStateException("Consumed more bytes than available")
Sergey Mashkov86f70612017-07-25 10:59:44 +03001936
Sergey Mashkov2cb1d5b2017-07-25 11:26:14 +03001937 buffer.bytesRead(c, consumed)
Roman Elizarov469cad32017-08-15 15:54:56 +03001938 buffer.prepareBuffer(readByteOrder, readPosition, c.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +03001939 }
1940
1941 return consumed
1942 }
1943
1944 private suspend fun readUTF8LineToAscii(out: Appendable, limit: Int): Boolean {
1945 if (state === ReadWriteBufferState.Terminated) return false
1946
1947 var cr = false
1948 var consumed = 0
1949 var unicodeStarted = false
Sergey Mashkova9a936d2017-08-23 14:39:41 +03001950 var eol = false
Sergey Mashkov86f70612017-07-25 10:59:44 +03001951
Sergey Mashkov4cd00142017-09-05 19:25:32 +03001952 consumeEachBufferRangeFast(false) { buffer, last ->
Sergey Mashkova9a936d2017-08-23 14:39:41 +03001953 var forceConsume = false
Sergey Mashkov0b12ffc2017-07-26 12:59:16 +03001954
1955 val rejected = !buffer.decodeASCII { ch ->
Sergey Mashkov86f70612017-07-25 10:59:44 +03001956 when {
Sergey Mashkov0b12ffc2017-07-26 12:59:16 +03001957 ch == '\r' -> {
Sergey Mashkov86f70612017-07-25 10:59:44 +03001958 cr = true
Sergey Mashkov0b12ffc2017-07-26 12:59:16 +03001959 true
Sergey Mashkov86f70612017-07-25 10:59:44 +03001960 }
Sergey Mashkov0b12ffc2017-07-26 12:59:16 +03001961 ch == '\n' -> {
1962 eol = true
Sergey Mashkova9a936d2017-08-23 14:39:41 +03001963 forceConsume = true
Sergey Mashkov0b12ffc2017-07-26 12:59:16 +03001964 false
Sergey Mashkov86f70612017-07-25 10:59:44 +03001965 }
1966 cr -> {
1967 cr = false
Sergey Mashkov0b12ffc2017-07-26 12:59:16 +03001968 eol = true
1969 false
Sergey Mashkov86f70612017-07-25 10:59:44 +03001970 }
Sergey Mashkov0b12ffc2017-07-26 12:59:16 +03001971 else -> {
Sergey Mashkov86f70612017-07-25 10:59:44 +03001972 if (consumed == limit) throw BufferOverflowException()
1973 consumed++
Sergey Mashkov0b12ffc2017-07-26 12:59:16 +03001974 out.append(ch)
1975 true
Sergey Mashkov86f70612017-07-25 10:59:44 +03001976 }
1977 }
1978 }
1979
Sergey Mashkova9a936d2017-08-23 14:39:41 +03001980 if (cr && last) {
1981 eol = true
1982 }
1983
1984 if (eol && forceConsume) {
Sergey Mashkov0b12ffc2017-07-26 12:59:16 +03001985 buffer.position(buffer.position() + 1)
1986 }
1987
1988 if (rejected && buffer.hasRemaining() && !eol) {
1989 unicodeStarted = true
1990 false
1991 } else
1992 !eol && !last
Sergey Mashkov86f70612017-07-25 10:59:44 +03001993 }
1994
Sergey Mashkova9a936d2017-08-23 14:39:41 +03001995 if (eol && !unicodeStarted) return true
Sergey Mashkov86f70612017-07-25 10:59:44 +03001996 return readUTF8LineToUtf8(out, limit - consumed, cr, consumed)
1997 }
1998
1999 private suspend fun readUTF8LineToUtf8(out: Appendable, limit: Int, cr0: Boolean, consumed0: Int): Boolean {
2000 var cr1 = cr0
2001 var consumed1 = 0
Sergey Mashkova9a936d2017-08-23 14:39:41 +03002002 var eol = false
Sergey Mashkov86f70612017-07-25 10:59:44 +03002003
Sergey Mashkov4cd00142017-09-05 19:25:32 +03002004 consumeEachBufferRangeFast(false) { buffer, last ->
Sergey Mashkova9a936d2017-08-23 14:39:41 +03002005 var forceConsume = false
Sergey Mashkov0b12ffc2017-07-26 12:59:16 +03002006
2007 val rc = buffer.decodeUTF8 { ch ->
Sergey Mashkov86f70612017-07-25 10:59:44 +03002008 when {
Sergey Mashkov0b12ffc2017-07-26 12:59:16 +03002009 ch == '\r' -> {
Sergey Mashkov86f70612017-07-25 10:59:44 +03002010 cr1 = true
Sergey Mashkov0b12ffc2017-07-26 12:59:16 +03002011 true
Sergey Mashkov86f70612017-07-25 10:59:44 +03002012 }
Sergey Mashkov0b12ffc2017-07-26 12:59:16 +03002013 ch == '\n' -> {
2014 eol = true
Sergey Mashkova9a936d2017-08-23 14:39:41 +03002015 forceConsume = true
Sergey Mashkov0b12ffc2017-07-26 12:59:16 +03002016 false
Sergey Mashkov86f70612017-07-25 10:59:44 +03002017 }
2018 cr1 -> {
2019 cr1 = false
Sergey Mashkov0b12ffc2017-07-26 12:59:16 +03002020 eol = true
2021 false
Sergey Mashkov86f70612017-07-25 10:59:44 +03002022 }
2023 else -> {
Sergey Mashkov0b12ffc2017-07-26 12:59:16 +03002024 if (consumed1 == limit) throw BufferOverflowException()
2025 consumed1++
2026 out.append(ch)
2027 true
Sergey Mashkov86f70612017-07-25 10:59:44 +03002028 }
2029 }
2030 }
2031
Sergey Mashkova9a936d2017-08-23 14:39:41 +03002032 if (cr1 && last) {
2033 eol = true
2034 }
2035
2036 if (eol && forceConsume) {
Sergey Mashkov0b12ffc2017-07-26 12:59:16 +03002037 buffer.position(buffer.position() + 1)
2038 }
2039
2040 rc != 0 && !eol && !last
Sergey Mashkov86f70612017-07-25 10:59:44 +03002041 }
2042
Sergey Mashkova9a936d2017-08-23 14:39:41 +03002043 if (eol) return true
Sergey Mashkov86f70612017-07-25 10:59:44 +03002044
2045 return readUTF8LineToUtf8Suspend(out, limit, cr1, consumed1 + consumed0)
2046 }
2047
2048 private suspend fun readUTF8LineToUtf8Suspend(out: Appendable, limit: Int, cr0: Boolean, consumed0: Int): Boolean {
2049 var cr1 = cr0
2050 var consumed1 = 0
Sergey Mashkova9a936d2017-08-23 14:39:41 +03002051 var eol = false
2052 var wrap = 0
Sergey Mashkov86f70612017-07-25 10:59:44 +03002053
Sergey Mashkov4cd00142017-09-05 19:25:32 +03002054 consumeEachBufferRangeSuspend { buffer, last ->
Sergey Mashkova9a936d2017-08-23 14:39:41 +03002055 var forceConsume = false
Sergey Mashkov86f70612017-07-25 10:59:44 +03002056
Sergey Mashkova9a936d2017-08-23 14:39:41 +03002057 val rc = buffer.decodeUTF8 { ch ->
Sergey Mashkov86f70612017-07-25 10:59:44 +03002058 when {
Sergey Mashkova9a936d2017-08-23 14:39:41 +03002059 ch == '\r' -> {
Sergey Mashkov86f70612017-07-25 10:59:44 +03002060 cr1 = true
Sergey Mashkova9a936d2017-08-23 14:39:41 +03002061 true
Sergey Mashkov86f70612017-07-25 10:59:44 +03002062 }
Sergey Mashkova9a936d2017-08-23 14:39:41 +03002063 ch == '\n' -> {
2064 eol = true
2065 forceConsume = true
2066 false
Sergey Mashkov86f70612017-07-25 10:59:44 +03002067 }
2068 cr1 -> {
2069 cr1 = false
Sergey Mashkova9a936d2017-08-23 14:39:41 +03002070 eol = true
2071 false
Sergey Mashkov86f70612017-07-25 10:59:44 +03002072 }
2073 else -> {
Sergey Mashkova9a936d2017-08-23 14:39:41 +03002074 if (consumed1 == limit) throw BufferOverflowException()
2075 consumed1++
2076 out.append(ch)
2077 true
Sergey Mashkov86f70612017-07-25 10:59:44 +03002078 }
2079 }
2080 }
2081
Sergey Mashkova9a936d2017-08-23 14:39:41 +03002082 if (cr1 && last) {
2083 eol = true
2084 }
2085
2086 if (eol && forceConsume) {
2087 buffer.position(buffer.position() + 1)
2088 }
2089
2090 wrap = maxOf(0, rc)
2091
2092 wrap == 0 && !eol && !last
Sergey Mashkov86f70612017-07-25 10:59:44 +03002093 }
2094
Sergey Mashkova9a936d2017-08-23 14:39:41 +03002095 if (wrap != 0) {
2096 if (!readSuspend(wrap)) {
2097
2098 }
2099
2100 return readUTF8LineToUtf8Suspend(out, limit, cr1, consumed1)
2101 }
2102
2103 return (consumed1 > 0 || consumed0 > 0 || eol)
Sergey Mashkov86f70612017-07-25 10:59:44 +03002104 }
2105
Sergey Mashkov86fe5912017-07-25 11:20:01 +03002106 suspend override fun <A : Appendable> readUTF8LineTo(out: A, limit: Int) = readUTF8LineToAscii(out, limit)
Sergey Mashkov86f70612017-07-25 10:59:44 +03002107
Roman Elizarov469cad32017-08-15 15:54:56 +03002108 private fun resumeReadOp() {
Sergey Mashkov98c87392017-11-20 17:58:28 +03002109 ReadOp.getAndSet(this, null)?.apply {
2110 val closedCause = closed?.cause
2111 when {
2112 closedCause != null -> resumeWithException(closedCause)
2113 else -> resume(true)
2114 }
2115 }
2116 }
2117
Sergey Mashkov060f36a2017-11-23 12:39:27 +03002118 private inline fun resumeReadOp(exception: () -> Throwable) {
2119 ReadOp.getAndSet(this, null)?.resumeWithException(exception())
Roman Elizarov469cad32017-08-15 15:54:56 +03002120 }
2121
Sergey Mashkov86f70612017-07-25 10:59:44 +03002122 private fun resumeWriteOp() {
Sergey Mashkovec2af2f2018-01-15 17:02:25 +03002123 while (true) {
2124 val writeOp = writeOp ?: return
Roman Elizarov469cad32017-08-15 15:54:56 +03002125 val closed = closed
Sergey Mashkov40f3fe52018-01-15 23:10:29 +03002126 if (closed == null && joining != null) {
2127 val state = state
2128 if (state is ReadWriteBufferState.Writing || state is ReadWriteBufferState.ReadingWriting) {
2129 } else if (state !== ReadWriteBufferState.Terminated) return
2130 }
Sergey Mashkovec2af2f2018-01-15 17:02:25 +03002131 if (WriteOp.compareAndSet(this, writeOp, null)) {
2132 if (closed == null) writeOp.resume(Unit) else writeOp.resumeWithException(closed.sendException)
2133 return
2134 }
Roman Elizarov469cad32017-08-15 15:54:56 +03002135 }
Sergey Mashkov86f70612017-07-25 10:59:44 +03002136 }
2137
Sergey Mashkov86fe5912017-07-25 11:20:01 +03002138 private fun resumeClosed(cause: Throwable?) {
Sergey Mashkov86f70612017-07-25 10:59:44 +03002139 ReadOp.getAndSet(this, null)?.let { c ->
2140 if (cause != null)
2141 c.resumeWithException(cause)
2142 else
Roman Elizarov469cad32017-08-15 15:54:56 +03002143 c.resume(state.capacity.availableForRead > 0)
Sergey Mashkov86f70612017-07-25 10:59:44 +03002144 }
2145
Sergey Mashkov9ef4e0c2017-12-06 17:09:01 +03002146 WriteOp.getAndSet(this, null)?.resumeWithException(cause ?:
Roman Elizarov339ccf32017-09-27 17:44:00 +03002147 ClosedWriteChannelException(DEFAULT_CLOSE_MESSAGE))
Sergey Mashkov86f70612017-07-25 10:59:44 +03002148 }
2149
Sergey Mashkov09b621e2017-12-22 20:42:18 +03002150 private suspend fun readSuspend(size: Int): Boolean {
2151 val capacity = state.capacity
2152 if (capacity.availableForRead >= size) return true
2153
2154 closed?.let { c ->
2155 if (c.cause != null) throw c.cause
2156 val afterCapacity = state.capacity
2157 val result = afterCapacity.flush() && afterCapacity.availableForRead >= size
2158 if (readOp != null) throw IllegalStateException("Read operation is already in progress")
2159 return result
2160 }
2161
2162 if (size == 1) return readSuspendImpl(1)
2163 return readSuspendLoop(size)
2164 }
2165
2166 private tailrec suspend fun readSuspendLoop(size: Int): Boolean {
Sergey Mashkove4d3ce72017-09-11 10:31:08 +03002167 val capacity = state.capacity
2168 if (capacity.availableForRead >= size) return true
Sergey Mashkov86f70612017-07-25 10:59:44 +03002169
2170 closed?.let { c ->
Sergey Mashkove4d3ce72017-09-11 10:31:08 +03002171 if (c.cause != null) throw c.cause
Sergey Mashkov32c90b12017-11-07 19:18:16 +03002172 val afterCapacity = state.capacity
Sergey Mashkov62deb432017-11-09 19:50:26 +03002173 val result = afterCapacity.flush() && afterCapacity.availableForRead >= size
2174 if (readOp != null) throw IllegalStateException("Read operation is already in progress")
2175 return result
Sergey Mashkov86f70612017-07-25 10:59:44 +03002176 }
2177
2178 if (!readSuspendImpl(size)) return false
2179
Sergey Mashkov09b621e2017-12-22 20:42:18 +03002180 return readSuspendLoop(size)
2181 }
2182
2183 private val readSuspendContinuationCache = MutableDelegateContinuation<Boolean>()
2184
Sergey Mashkovd1842c62018-01-23 23:16:54 +03002185 @Suppress("NOTHING_TO_INLINE")
2186 private inline fun readSuspendPredicate(size: Int): Boolean {
2187 val state = state
2188
2189 if (state.capacity.availableForRead >= size) return false
2190 if (joining != null && writeOp != null &&
2191 (state === ReadWriteBufferState.IdleEmpty || state is ReadWriteBufferState.IdleNonEmpty)) return false
2192
2193 return true
2194 }
2195
Sergey Mashkov09b621e2017-12-22 20:42:18 +03002196 private fun suspensionForSize(size: Int, c: Continuation<Boolean>): Any {
2197 do {
Sergey Mashkovd1842c62018-01-23 23:16:54 +03002198 if (!readSuspendPredicate(size)) {
Sergey Mashkov09b621e2017-12-22 20:42:18 +03002199 c.resume(true)
2200 break
2201 }
2202
2203 closed?.let {
2204 if (it.cause != null) {
2205 c.resumeWithException(it.cause)
2206 } else {
2207 c.resume(state.capacity.flush() && state.capacity.availableForRead >= size)
2208 }
2209 return COROUTINE_SUSPENDED
2210 }
Sergey Mashkovd1842c62018-01-23 23:16:54 +03002211 } while (!setContinuation({ readOp }, ReadOp, c, { closed == null && readSuspendPredicate(size) }))
Sergey Mashkov09b621e2017-12-22 20:42:18 +03002212
2213 return COROUTINE_SUSPENDED
Sergey Mashkov86f70612017-07-25 10:59:44 +03002214 }
2215
2216 private suspend fun readSuspendImpl(size: Int): Boolean {
Sergey Mashkovd1842c62018-01-23 23:16:54 +03002217 if (!readSuspendPredicate(size)) return true
Sergey Mashkov86f70612017-07-25 10:59:44 +03002218
Sergey Mashkov09b621e2017-12-22 20:42:18 +03002219 return suspendCoroutineOrReturn { raw ->
2220 val c = readSuspendContinuationCache
2221 suspensionForSize(size, c)
2222 c.swap(raw)
Sergey Mashkov86f70612017-07-25 10:59:44 +03002223 }
2224 }
2225
Sergey Mashkovd1842c62018-01-23 23:16:54 +03002226 private fun shouldResumeReadOp() = joining != null &&
2227 (state === ReadWriteBufferState.IdleEmpty || state is ReadWriteBufferState.IdleNonEmpty)
2228
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03002229 private fun writeSuspendPredicate(size: Int): Boolean {
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03002230 val joined = joining
Sergey Mashkov33d5d132017-11-10 10:28:57 +03002231 val state = state
2232 val closed = closed
2233
Sergey Mashkov39a88542017-12-19 18:41:34 +03002234 return when {
2235 closed != null -> false
2236 joined == null -> state.capacity.availableForWrite < size && state !== ReadWriteBufferState.IdleEmpty
Sergey Mashkov40f3fe52018-01-15 23:10:29 +03002237 else -> state !== ReadWriteBufferState.Terminated && state !is ReadWriteBufferState.Writing && state !is ReadWriteBufferState.ReadingWriting
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03002238 }
2239 }
2240
Sergey Mashkov09b621e2017-12-22 20:42:18 +03002241 private val writeSuspendContinuationCache = MutableDelegateContinuation<Unit>()
2242 @Volatile
2243 private var writeSuspensionSize: Int = 0
2244 private val writeSuspension = { c: Continuation<Unit> ->
2245 val size = writeSuspensionSize
2246
2247 do {
2248 closed?.sendException?.let { throw it }
2249 if (!writeSuspendPredicate(size)) {
2250 c.resume(Unit)
2251 break
2252 }
2253 } while (!setContinuation({ writeOp }, WriteOp, c, { writeSuspendPredicate(size) }))
2254
2255 flushImpl(1, minWriteSize = size)
2256
Sergey Mashkovd1842c62018-01-23 23:16:54 +03002257 if (shouldResumeReadOp()) {
2258 resumeReadOp()
2259 }
2260
Sergey Mashkov09b621e2017-12-22 20:42:18 +03002261 COROUTINE_SUSPENDED
2262 }
2263
2264 private suspend fun tryWriteSuspend(size: Int) {
2265 if (!writeSuspendPredicate(size)) {
2266 closed?.sendException?.let { throw it }
2267 return
2268 }
2269
2270 writeSuspensionSize = size
2271 if (attachedJob != null) {
2272 return suspendCoroutineOrReturn(writeSuspension)
2273 }
2274
2275 return suspendCoroutineOrReturn { raw ->
2276 val c = writeSuspendContinuationCache
2277 writeSuspension(c)
2278 c.swap(raw)
2279 }
2280 }
2281
Sergey Mashkov86fe5912017-07-25 11:20:01 +03002282 private suspend fun writeSuspend(size: Int) {
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03002283 while (writeSuspendPredicate(size)) {
Sergey Mashkov86f70612017-07-25 10:59:44 +03002284 suspendCancellableCoroutine<Unit>(holdCancellability = true) { c ->
2285 do {
2286 closed?.sendException?.let { throw it }
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03002287 if (!writeSuspendPredicate(size)) {
Sergey Mashkov86f70612017-07-25 10:59:44 +03002288 c.resume(Unit)
2289 break
2290 }
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03002291 } while (!setContinuation({ writeOp }, WriteOp, c, { writeSuspendPredicate(size) }))
Sergey Mashkov86f70612017-07-25 10:59:44 +03002292
Sergey Mashkove6764082017-12-05 12:27:09 +03002293 flushImpl(1, minWriteSize = size)
Sergey Mashkovd1842c62018-01-23 23:16:54 +03002294
2295 if (shouldResumeReadOp()) {
2296 resumeReadOp()
2297 }
Sergey Mashkov86f70612017-07-25 10:59:44 +03002298 }
2299 }
2300
2301 closed?.sendException?.let { throw it }
2302 }
2303
Sergey Mashkov09b621e2017-12-22 20:42:18 +03002304 private inline fun <T, C : Continuation<T>> setContinuation(getter: () -> C?, updater: AtomicReferenceFieldUpdater<ByteBufferChannel, C?>, continuation: C, predicate: () -> Boolean): Boolean {
Sergey Mashkov86f70612017-07-25 10:59:44 +03002305 while (true) {
2306 val current = getter()
2307 if (current != null) throw IllegalStateException("Operation is already in progress")
2308
2309 if (!predicate()) {
2310 return false
2311 }
2312
2313 if (updater.compareAndSet(this, null, continuation)) {
2314 if (predicate() || !updater.compareAndSet(this, continuation, null)) {
Sergey Mashkov09b621e2017-12-22 20:42:18 +03002315 if (attachedJob == null && continuation is CancellableContinuation<*>) {
2316 continuation.initCancellability()
2317 }
Sergey Mashkov86f70612017-07-25 10:59:44 +03002318 return true
2319 }
2320
2321 return false
2322 }
2323 }
2324 }
2325
2326 private fun newBuffer(): ReadWriteBufferState.Initial {
2327 val result = pool.borrow()
2328
Sergey Mashkovf774f372017-10-09 12:09:16 +03002329 result.readBuffer.order(readByteOrder.nioOrder)
2330 result.writeBuffer.order(writeByteOrder.nioOrder)
Roman Elizarov469cad32017-08-15 15:54:56 +03002331 result.capacity.resetForWrite()
Sergey Mashkov86f70612017-07-25 10:59:44 +03002332
2333 return result
2334 }
2335
2336 private fun releaseBuffer(buffer: ReadWriteBufferState.Initial) {
2337 pool.recycle(buffer)
2338 }
2339
Sergey Mashkov39a88542017-12-19 18:41:34 +03002340 private inline fun updateStateAndGet(block: (ReadWriteBufferState) -> ReadWriteBufferState?): ReadWriteBufferState {
2341 val updater = State
2342 while (true) {
2343 val old = state
2344 val newState = block(old) ?: continue
2345 if (old === newState || updater.compareAndSet(this, old, newState)) return newState
2346 }
2347 }
2348
Sergey Mashkov86f70612017-07-25 10:59:44 +03002349 companion object {
Roman Elizarov83762032017-08-08 19:58:40 +03002350
Sergey Mashkov86f70612017-07-25 10:59:44 +03002351 private const val ReservedLongIndex = -8
2352
Roman Elizarov469cad32017-08-15 15:54:56 +03002353 // todo: replace with atomicfu, remove companion object
Sergey Mashkov86f70612017-07-25 10:59:44 +03002354 private val State = updater(ByteBufferChannel::state)
Sergey Mashkov86f70612017-07-25 10:59:44 +03002355 private val WriteOp = updater(ByteBufferChannel::writeOp)
2356 private val ReadOp = updater(ByteBufferChannel::readOp)
Roman Elizarov469cad32017-08-15 15:54:56 +03002357 private val Closed = updater(ByteBufferChannel::closed)
Sergey Mashkov86f70612017-07-25 10:59:44 +03002358 }
2359
Sergey Mashkov4cd00142017-09-05 19:25:32 +03002360 private object TerminatedLookAhead : LookAheadSuspendSession {
2361 override fun consumed(n: Int) {
2362 if (n > 0) throw IllegalStateException("Unable to mark $n bytes consumed for already terminated channel")
2363 }
2364
2365 override fun request(skip: Int, atLeast: Int) = null
2366
Sergey Mashkov09b621e2017-12-22 20:42:18 +03002367 suspend override fun awaitAtLeast(n: Int): Boolean {
2368 return false
Sergey Mashkov4cd00142017-09-05 19:25:32 +03002369 }
2370 }
2371
Sergey Mashkov86fe5912017-07-25 11:20:01 +03002372 private class ClosedElement(val cause: Throwable?) {
Sergey Mashkov86f70612017-07-25 10:59:44 +03002373 val sendException: Throwable
Roman Elizarov469cad32017-08-15 15:54:56 +03002374 get() = cause ?: ClosedWriteChannelException("The channel was closed")
Sergey Mashkov86f70612017-07-25 10:59:44 +03002375
2376 companion object {
2377 val EmptyCause = ClosedElement(null)
2378 }
2379 }
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03002380
Sergey Mashkov78317162018-03-21 10:12:40 +03002381 internal class JoiningState(val delegatedTo: ByteBufferChannel, val delegateClose: Boolean, val delegateFlush: Boolean) {
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03002382 private val _closeWaitJob = atomic<Job?>(null)
2383 private val closed = atomic(0)
2384
2385 private val closeWaitJob: Job
2386 get() {
2387 while (true) {
2388 val current = _closeWaitJob.value
2389 if (current != null) return current
2390 val newJob = Job()
Sergey Mashkov47d92ba2017-11-17 19:30:22 +03002391 if (_closeWaitJob.compareAndSet(null, newJob)) {
2392 if (closed.value == 1) newJob.cancel()
2393 return newJob
2394 }
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03002395 }
2396 }
2397
2398 fun complete() {
2399 closed.value = 1
2400 _closeWaitJob.getAndSet(null)?.cancel()
2401 }
2402
2403 suspend fun awaitClose() {
2404 if (closed.value == 1) return
2405 return closeWaitJob.join()
2406 }
2407 }
Sergey Mashkov86f70612017-07-25 10:59:44 +03002408}
Roman Elizarov469cad32017-08-15 15:54:56 +03002409