/*
 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
 */

package kotlinx.coroutines.experimental.io

import kotlinx.coroutines.experimental.io.internal.*
import kotlinx.coroutines.experimental.io.packet.*
import kotlinx.coroutines.experimental.io.packet.ByteReadPacket
import kotlinx.io.core.*

/**
 * Channel for asynchronous reading of sequences of bytes.
 * This is a **single-reader channel**.
 *
 * Operations on this channel cannot be invoked concurrently.
 */
public interface ByteReadChannel {
    /**
     * Returns number of bytes that can be read without suspension. Read operations do no suspend and return
     * immediately when this number is at least the number of bytes requested for read.
     */
    public val availableForRead: Int

    /**
     * Returns `true` if the channel is closed and no remaining bytes are available for read.
     * It implies that [availableForRead] is zero.
     */
    public val isClosedForRead: Boolean

    public val isClosedForWrite: Boolean

    /**
     * Byte order that is used for multi-byte read operations
     * (such as [readShort], [readInt], [readLong], [readFloat], and [readDouble]).
     */
    public var readByteOrder: ByteOrder

    /**
     * Number of bytes read from the channel.
     * It is not guaranteed to be atomic so could be updated in the middle of long running read operation.
     */
    public val totalBytesRead: Long

    /**
     * Reads all available bytes to [dst] buffer and returns immediately or suspends if no bytes available
     * @return number of bytes were read or `-1` if the channel has been closed
     */
    suspend fun readAvailable(dst: ByteArray, offset: Int, length: Int): Int
    suspend fun readAvailable(dst: ByteArray) = readAvailable(dst, 0, dst.size)
    suspend fun readAvailable(dst: ByteBuffer): Int
    suspend fun readAvailable(dst: BufferView): Int

    /**
     * Reads all [length] bytes to [dst] buffer or fails if channel has been closed.
     * Suspends if not enough bytes available.
     */
    suspend fun readFully(dst: ByteArray, offset: Int, length: Int)
    suspend fun readFully(dst: ByteArray) = readFully(dst, 0, dst.size)
    suspend fun readFully(dst: ByteBuffer): Int

    /**
     * Reads the specified amount of bytes and makes a byte packet from them. Fails if channel has been closed
     * and not enough bytes available. Accepts [headerSizeHint] to be provided, see [WritePacket].
     */
    suspend fun readPacket(size: Int, headerSizeHint: Int = 0): ByteReadPacket

    /**
     * Reads a long number (suspending if not enough bytes available) or fails if channel has been closed
     * and not enough bytes.
     */
    suspend fun readLong(): Long

    /**
     * Reads an int number (suspending if not enough bytes available) or fails if channel has been closed
     * and not enough bytes.
     */
    suspend fun readInt(): Int

    /**
     * Reads a short number (suspending if not enough bytes available) or fails if channel has been closed
     * and not enough bytes.
     */
    suspend fun readShort(): Short

    /**
     * Reads a byte (suspending if no bytes available yet) or fails if channel has been closed
     * and not enough bytes.
     */
    suspend fun readByte(): Byte

    /**
     * Reads a boolean value (suspending if no bytes available yet) or fails if channel has been closed
     * and not enough bytes.
     */
    suspend fun readBoolean(): Boolean

    /**
     * Reads double number (suspending if not enough bytes available) or fails if channel has been closed
     * and not enough bytes.
     */
    suspend fun readDouble(): Double

    /**
     * Reads float number (suspending if not enough bytes available) or fails if channel has been closed
     * and not enough bytes.
     */
    suspend fun readFloat(): Float

    /**
     * For every available bytes range invokes [visitor] function until it return false or end of stream encountered
     */
    suspend fun consumeEachBufferRange(visitor: ConsumeEachBufferVisitor)

    fun <R> lookAhead(visitor: LookAheadSession.() -> R): R
    suspend fun <R> lookAheadSuspend(visitor: suspend LookAheadSuspendSession.() -> R): R

    /**
     * Reads a line of UTF-8 characters to the specified [out] buffer up to [limit] characters.
     * Supports both CR-LF and LF line endings.
     * Throws an exception if the specified [limit] has been exceeded.
     *
     * @return `true` if line has been read (possibly empty) or `false` if channel has been closed
     * and no characters were read.
     */
    suspend fun <A : Appendable> readUTF8LineTo(out: A, limit: Int = Int.MAX_VALUE): Boolean

    /**
     * Invokes [block] when it will be possible to read at least [min] bytes
     * providing byte buffer to it so lambda can read from the buffer
     * up to [ByteBuffer.remaining] bytes. If there are no [min] bytes available then the invocation could
     * suspend until the requirement will be met.
     *
     * If [min] is zero then the invocation will suspend until at least one byte available.
     *
     * Warning: it is not guaranteed that all of remaining bytes will be represented as a single byte buffer
     * eg: it could be 4 bytes available for read but the provided byte buffer could have only 2 remaining bytes:
     * in this case you have to invoke read again (with decreased [min] accordingly).
     *
     * It will fail with [EOFException] if not enough bytes ([availableForRead] < [min]) available
     * in the channel after it is closed.
     *
     * [block] lambda should modify buffer's position accordingly. It also could temporarily modify limit however
     * it should restore it before return. It is not recommended to access any bytes of the buffer outside of the
     * provided byte range [position(); limit()) as there could be any garbage or incomplete data.
     *
     * @param min amount of bytes available for read, should be positive or zero
     * @param block to be invoked when at least [min] bytes available for read
     */
    suspend fun read(min: Int = 1, block: (ByteBuffer) -> Unit)

    /**
     * Close channel with optional [cause] cancellation. Unlike [ByteWriteChannel.close] that could close channel
     * normally, cancel does always close with error so any operations on this channel will always fail
     * and all suspensions will be resumed with exception.
     *
     * Please note that if the channel has been provided by [reader] or [writer] then the corresponding owning
     * coroutine will be cancelled as well
     *
     * @see ByteWriteChannel.close
     */
    fun cancel(cause: Throwable? = null): Boolean

    /**
     * Discard up to [max] bytes
     *
     * @return number of bytes were discarded
     */
    suspend fun discard(max: Long = Long.MAX_VALUE): Long
}

typealias ConsumeEachBufferVisitor = (buffer: java.nio.ByteBuffer, last: Boolean) -> Boolean

suspend fun ByteReadChannel.joinTo(dst: ByteWriteChannel, closeOnEnd: Boolean, flushOnEnd: Boolean = true) {
    require(dst !== this)

    if (this is ByteBufferChannel && dst is ByteBufferChannel) {
        return dst.joinFrom(this, closeOnEnd, flushOnEnd)
    }

    return joinToImplSuspend(dst, closeOnEnd, flushOnEnd)
}

private suspend fun ByteReadChannel.joinToImplSuspend(dst: ByteWriteChannel, close: Boolean, flush: Boolean) {
    copyToImpl(dst, Long.MAX_VALUE)
    if (close) {
        dst.close()
    } else if (flush) {
        dst.flush()
    }
}

/**
 * Reads up to [limit] bytes from receiver channel and writes them to [dst] channel.
 * Closes [dst] channel if fails to read or write with cause exception.
 * @return a number of copied bytes
 */
suspend fun ByteReadChannel.copyTo(dst: ByteWriteChannel, limit: Long = Long.MAX_VALUE): Long {
    require(this !== dst)
    require(limit >= 0L)

    if (this is ByteBufferChannel && dst is ByteBufferChannel) {
        return dst.copyDirect(this, limit, null)
    }

    return copyToImpl(dst, limit)
}

private suspend fun ByteReadChannel.copyToImpl(dst: ByteWriteChannel, limit: Long): Long {
    val buffer = BufferPool.borrow()
    val dstNeedsFlush = !dst.autoFlush

    try {
        var copied = 0L

        while (true) {
            buffer.clear()

            val bufferLimit = limit - copied
            if (bufferLimit <= 0) break
            if (bufferLimit < buffer.limit()) {
                buffer.limit(bufferLimit.toInt())
            }
            val size = readAvailable(buffer)
            if (size == -1) break

            buffer.flip()
            dst.writeFully(buffer)
            copied += size

            if (dstNeedsFlush && availableForRead == 0) {
                dst.flush()
            }
        }
        return copied
    } catch (t: Throwable) {
        dst.close(t)
        throw t
    } finally {
        BufferPool.recycle(buffer)
    }
}

/**
 * Reads all the bytes from receiver channel and writes them to [dst] channel and then closes it.
 * Closes [dst] channel if fails to read or write with cause exception.
 * @return a number of copied bytes
 */
suspend fun ByteReadChannel.copyAndClose(dst: ByteWriteChannel, limit: Long = Long.MAX_VALUE): Long {
    val count = copyTo(dst, limit)
    dst.close()
    return count
}

/**
 * Reads all the bytes from receiver channel and builds a packet that is returned unless the specified [limit] exceeded.
 * It will simply stop reading and return packet of size [limit] in this case
 */
suspend fun ByteReadChannel.readRemaining(limit: Int = Int.MAX_VALUE): ByteReadPacket {
    val buffer = BufferPool.borrow()
    val packet = WritePacket()

    try {
        var copied = 0L

        while (copied < limit) {
            buffer.clear()
            if (limit - copied < buffer.limit()) {
                buffer.limit((limit - copied).toInt())
            }
            val size = readAvailable(buffer)
            if (size == -1) break

            buffer.flip()
            packet.writeFully(buffer)
            copied += size
        }

        return packet.build()
    } catch (t: Throwable) {
        packet.release()
        throw t
    } finally {
        BufferPool.recycle(buffer)
    }
}
