blob: 6e35f6a6671b04d552eacff30f3536b5cca34554 [file] [log] [blame]
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.experimental.io
import kotlinx.coroutines.experimental.io.internal.*
import kotlinx.coroutines.experimental.io.packet.*
import kotlinx.coroutines.experimental.io.packet.ByteReadPacket
import kotlinx.io.core.*
/**
* Channel for asynchronous reading of sequences of bytes.
* This is a **single-reader channel**.
*
* Operations on this channel cannot be invoked concurrently.
*/
public interface ByteReadChannel {
/**
* Returns number of bytes that can be read without suspension. Read operations do no suspend and return
* immediately when this number is at least the number of bytes requested for read.
*/
public val availableForRead: Int
/**
* Returns `true` if the channel is closed and no remaining bytes are available for read.
* It implies that [availableForRead] is zero.
*/
public val isClosedForRead: Boolean
public val isClosedForWrite: Boolean
/**
* Byte order that is used for multi-byte read operations
* (such as [readShort], [readInt], [readLong], [readFloat], and [readDouble]).
*/
public var readByteOrder: ByteOrder
/**
* Number of bytes read from the channel.
* It is not guaranteed to be atomic so could be updated in the middle of long running read operation.
*/
public val totalBytesRead: Long
/**
* Reads all available bytes to [dst] buffer and returns immediately or suspends if no bytes available
* @return number of bytes were read or `-1` if the channel has been closed
*/
suspend fun readAvailable(dst: ByteArray, offset: Int, length: Int): Int
suspend fun readAvailable(dst: ByteArray) = readAvailable(dst, 0, dst.size)
suspend fun readAvailable(dst: ByteBuffer): Int
suspend fun readAvailable(dst: BufferView): Int
/**
* Reads all [length] bytes to [dst] buffer or fails if channel has been closed.
* Suspends if not enough bytes available.
*/
suspend fun readFully(dst: ByteArray, offset: Int, length: Int)
suspend fun readFully(dst: ByteArray) = readFully(dst, 0, dst.size)
suspend fun readFully(dst: ByteBuffer): Int
/**
* Reads the specified amount of bytes and makes a byte packet from them. Fails if channel has been closed
* and not enough bytes available. Accepts [headerSizeHint] to be provided, see [WritePacket].
*/
suspend fun readPacket(size: Int, headerSizeHint: Int = 0): ByteReadPacket
/**
* Reads a long number (suspending if not enough bytes available) or fails if channel has been closed
* and not enough bytes.
*/
suspend fun readLong(): Long
/**
* Reads an int number (suspending if not enough bytes available) or fails if channel has been closed
* and not enough bytes.
*/
suspend fun readInt(): Int
/**
* Reads a short number (suspending if not enough bytes available) or fails if channel has been closed
* and not enough bytes.
*/
suspend fun readShort(): Short
/**
* Reads a byte (suspending if no bytes available yet) or fails if channel has been closed
* and not enough bytes.
*/
suspend fun readByte(): Byte
/**
* Reads a boolean value (suspending if no bytes available yet) or fails if channel has been closed
* and not enough bytes.
*/
suspend fun readBoolean(): Boolean
/**
* Reads double number (suspending if not enough bytes available) or fails if channel has been closed
* and not enough bytes.
*/
suspend fun readDouble(): Double
/**
* Reads float number (suspending if not enough bytes available) or fails if channel has been closed
* and not enough bytes.
*/
suspend fun readFloat(): Float
/**
* For every available bytes range invokes [visitor] function until it return false or end of stream encountered
*/
suspend fun consumeEachBufferRange(visitor: ConsumeEachBufferVisitor)
fun <R> lookAhead(visitor: LookAheadSession.() -> R): R
suspend fun <R> lookAheadSuspend(visitor: suspend LookAheadSuspendSession.() -> R): R
/**
* Reads a line of UTF-8 characters to the specified [out] buffer up to [limit] characters.
* Supports both CR-LF and LF line endings.
* Throws an exception if the specified [limit] has been exceeded.
*
* @return `true` if line has been read (possibly empty) or `false` if channel has been closed
* and no characters were read.
*/
suspend fun <A : Appendable> readUTF8LineTo(out: A, limit: Int = Int.MAX_VALUE): Boolean
/**
* Invokes [block] when it will be possible to read at least [min] bytes
* providing byte buffer to it so lambda can read from the buffer
* up to [ByteBuffer.remaining] bytes. If there are no [min] bytes available then the invocation could
* suspend until the requirement will be met.
*
* If [min] is zero then the invocation will suspend until at least one byte available.
*
* Warning: it is not guaranteed that all of remaining bytes will be represented as a single byte buffer
* eg: it could be 4 bytes available for read but the provided byte buffer could have only 2 remaining bytes:
* in this case you have to invoke read again (with decreased [min] accordingly).
*
* It will fail with [EOFException] if not enough bytes ([availableForRead] < [min]) available
* in the channel after it is closed.
*
* [block] lambda should modify buffer's position accordingly. It also could temporarily modify limit however
* it should restore it before return. It is not recommended to access any bytes of the buffer outside of the
* provided byte range [position(); limit()) as there could be any garbage or incomplete data.
*
* @param min amount of bytes available for read, should be positive or zero
* @param block to be invoked when at least [min] bytes available for read
*/
suspend fun read(min: Int = 1, block: (ByteBuffer) -> Unit)
/**
* Close channel with optional [cause] cancellation. Unlike [ByteWriteChannel.close] that could close channel
* normally, cancel does always close with error so any operations on this channel will always fail
* and all suspensions will be resumed with exception.
*
* Please note that if the channel has been provided by [reader] or [writer] then the corresponding owning
* coroutine will be cancelled as well
*
* @see ByteWriteChannel.close
*/
fun cancel(cause: Throwable? = null): Boolean
/**
* Discard up to [max] bytes
*
* @return number of bytes were discarded
*/
suspend fun discard(max: Long = Long.MAX_VALUE): Long
}
typealias ConsumeEachBufferVisitor = (buffer: java.nio.ByteBuffer, last: Boolean) -> Boolean
suspend fun ByteReadChannel.joinTo(dst: ByteWriteChannel, closeOnEnd: Boolean, flushOnEnd: Boolean = true) {
require(dst !== this)
if (this is ByteBufferChannel && dst is ByteBufferChannel) {
return dst.joinFrom(this, closeOnEnd, flushOnEnd)
}
return joinToImplSuspend(dst, closeOnEnd, flushOnEnd)
}
private suspend fun ByteReadChannel.joinToImplSuspend(dst: ByteWriteChannel, close: Boolean, flush: Boolean) {
copyToImpl(dst, Long.MAX_VALUE)
if (close) {
dst.close()
} else if (flush) {
dst.flush()
}
}
/**
* Reads up to [limit] bytes from receiver channel and writes them to [dst] channel.
* Closes [dst] channel if fails to read or write with cause exception.
* @return a number of copied bytes
*/
suspend fun ByteReadChannel.copyTo(dst: ByteWriteChannel, limit: Long = Long.MAX_VALUE): Long {
require(this !== dst)
require(limit >= 0L)
if (this is ByteBufferChannel && dst is ByteBufferChannel) {
return dst.copyDirect(this, limit, null)
}
return copyToImpl(dst, limit)
}
private suspend fun ByteReadChannel.copyToImpl(dst: ByteWriteChannel, limit: Long): Long {
val buffer = BufferPool.borrow()
val dstNeedsFlush = !dst.autoFlush
try {
var copied = 0L
while (true) {
buffer.clear()
val bufferLimit = limit - copied
if (bufferLimit <= 0) break
if (bufferLimit < buffer.limit()) {
buffer.limit(bufferLimit.toInt())
}
val size = readAvailable(buffer)
if (size == -1) break
buffer.flip()
dst.writeFully(buffer)
copied += size
if (dstNeedsFlush && availableForRead == 0) {
dst.flush()
}
}
return copied
} catch (t: Throwable) {
dst.close(t)
throw t
} finally {
BufferPool.recycle(buffer)
}
}
/**
* Reads all the bytes from receiver channel and writes them to [dst] channel and then closes it.
* Closes [dst] channel if fails to read or write with cause exception.
* @return a number of copied bytes
*/
suspend fun ByteReadChannel.copyAndClose(dst: ByteWriteChannel, limit: Long = Long.MAX_VALUE): Long {
val count = copyTo(dst, limit)
dst.close()
return count
}
/**
* Reads all the bytes from receiver channel and builds a packet that is returned unless the specified [limit] exceeded.
* It will simply stop reading and return packet of size [limit] in this case
*/
suspend fun ByteReadChannel.readRemaining(limit: Int = Int.MAX_VALUE): ByteReadPacket {
val buffer = BufferPool.borrow()
val packet = WritePacket()
try {
var copied = 0L
while (copied < limit) {
buffer.clear()
if (limit - copied < buffer.limit()) {
buffer.limit((limit - copied).toInt())
}
val size = readAvailable(buffer)
if (size == -1) break
buffer.flip()
packet.writeFully(buffer)
copied += size
}
return packet.build()
} catch (t: Throwable) {
packet.release()
throw t
} finally {
BufferPool.recycle(buffer)
}
}