blob: dce4847d51a590e3e93825ac6998eeb5b86a1838 [file] [log] [blame]
package kotlinx.coroutines.experimental.io
import kotlinx.coroutines.experimental.io.internal.*
import java.io.*
/**
* Reads from the channel to the specified [dst] byte buffer until one of the following:
* - channel's end
* - [dst] capacity exceeded
* - [delimiter] bytes encountered
*
* If [delimiter] bytes encountered then these bytes remain unconsumed
*
* @return non-negative number of copied bytes, possibly 0
*/
suspend fun ByteReadChannel.readUntilDelimiter(delimiter: ByteBuffer, dst: ByteBuffer): Int {
require(delimiter.hasRemaining())
require(delimiter !== dst)
var copied = 0
var endFound = false
lookAhead {
do {
val rc = tryCopyUntilDelimiter(delimiter, dst)
if (rc == 0) break
val size = if (rc < 0) {
endFound = true
-rc
} else rc
copied += size
} while (dst.hasRemaining() && !endFound)
}
return if (copied == 0 && isClosedForRead) -1
else if (!dst.hasRemaining() || endFound) copied
else readUntilDelimiterSuspend(delimiter, dst, copied)
}
suspend fun ByteReadChannel.skipDelimiter(delimiter: ByteBuffer) {
require(delimiter.hasRemaining())
var found = false
lookAhead {
found = tryEnsureDelimiter(delimiter) == delimiter.remaining()
}
if (!found) {
skipDelimiterSuspend(delimiter)
}
}
private suspend fun ByteReadChannel.skipDelimiterSuspend(delimiter: ByteBuffer) {
lookAheadSuspend {
awaitAtLeast(delimiter.remaining())
if (tryEnsureDelimiter(delimiter) != delimiter.remaining()) throw IOException("Broken delimiter occurred")
}
}
private suspend fun ByteReadChannel.readUntilDelimiterSuspend(delimiter: ByteBuffer, dst: ByteBuffer, copied0: Int): Int {
require(delimiter !== dst)
require(copied0 >= 0)
var endFound = false
val copied = lookAheadSuspend {
var copied = copied0
do {
awaitAtLeast(1)
val rc = tryCopyUntilDelimiter(delimiter, dst)
if (rc == 0) {
if (startsWithDelimiter(delimiter) == delimiter.remaining()) {
endFound = true
break
}
if (isClosedForWrite) {
break
} else {
awaitAtLeast(delimiter.remaining())
continue
}
}
val size = if (rc <= 0) {
endFound = true
-rc
} else rc
copied += size
} while (dst.hasRemaining() && !endFound)
copied
}
return when {
copied > 0 && isClosedForWrite && !endFound -> copied + readAvailable(dst).coerceAtLeast(0)
copied == 0 && isClosedForRead -> -1
else -> copied
}
}
/**
* @return a positive number of bytes copied if no [delimiter] found yet or a negated number of bytes copied if
* the delimited has been found, or 0 if no buffer available (not yet ready or EOF)
*/
private fun LookAheadSession.tryCopyUntilDelimiter(delimiter: ByteBuffer, dst: ByteBuffer): Int {
var endFound = false
val buffer = request(0, 1) ?: return 0
val index = buffer.indexOfPartial(delimiter)
val size = if (index != -1) {
val found = minOf(buffer.remaining() - index, delimiter.remaining())
val notKnown = delimiter.remaining() - found
if (notKnown == 0) {
endFound = true
dst.putLimited(buffer, buffer.position() + index)
} else {
val remembered = buffer.duplicate()
val next = request(index + found, 1)
if (next == null) {
dst.putLimited(remembered, remembered.position() + index)
} else if (next.startsWith(delimiter, found)) {
if (next.remaining() >= notKnown) {
endFound = true
dst.putLimited(remembered, remembered.position() + index)
} else {
dst.putLimited(remembered, remembered.position() + index)
}
} else {
dst.putLimited(remembered, remembered.position() + index + 1)
}
}
} else {
dst.putAtMost(buffer)
}
consumed(size)
return if (endFound) -size else size
}
private fun LookAheadSession.tryEnsureDelimiter(delimiter: ByteBuffer): Int {
val found = startsWithDelimiter(delimiter)
if (found == -1) throw IOException("Failed to skip delimiter: actual bytes differ from delimiter bytes")
if (found < delimiter.remaining()) return found
consumed(delimiter.remaining())
return delimiter.remaining()
}
/**
* @return Number of bytes of the delimiter found (possibly 0 if no bytes available yet) or -1 if it doesn't start
*/
private fun LookAheadSession.startsWithDelimiter(delimiter: ByteBuffer): Int {
val buffer = request(0, 1) ?: return 0
val index = buffer.indexOfPartial(delimiter)
if (index != 0) return -1
val found = minOf(buffer.remaining() - index, delimiter.remaining())
val notKnown = delimiter.remaining() - found
if (notKnown > 0) {
val next = request(index + found, notKnown) ?: return found
if (!next.startsWith(delimiter, found)) return -1
}
return delimiter.remaining()
}