Merge branch 'develop' into byte-channel-re
diff --git a/integration/kotlinx-coroutines-io/pom.xml b/integration/kotlinx-coroutines-io/pom.xml
new file mode 100644
index 0000000..c736bad
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/pom.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>kotlinx-coroutines</artifactId>
+ <groupId>org.jetbrains.kotlinx</groupId>
+ <version>0.17-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>kotlinx-coroutines-io</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.jetbrains.kotlinx</groupId>
+ <artifactId>kotlinx-coroutines-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-test-junit</artifactId>
+ <version>${kotlin.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.kotlinx</groupId>
+ <artifactId>kotlinx-coroutines-core</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <sourceDirectory>src/main/kotlin</sourceDirectory>
+ <testSourceDirectory>src/test/kotlin</testSourceDirectory>
+ </build>
+</project>
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBuffer.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBuffer.kt
new file mode 100644
index 0000000..42eecf4
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBuffer.kt
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.io
+
+/**
+ * Byte buffer.
+ */
+typealias ByteBuffer = java.nio.ByteBuffer
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt
new file mode 100644
index 0000000..1486f22
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt
@@ -0,0 +1,1308 @@
+@file:Suppress("UsePropertyAccessSyntax") // for ByteBuffer.getShort/getInt/etc
+
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.*
+import kotlinx.coroutines.experimental.io.internal.*
+import kotlinx.coroutines.experimental.io.packet.*
+import java.nio.*
+import java.util.concurrent.atomic.*
+
+// implementation for ByteChannel
+internal class ByteBufferChannel(
+ override val autoFlush: Boolean,
+ private val pool: ObjectPool<ReadWriteBufferState.Initial> = BufferObjectPool,
+ private val reservedSize: Int = RESERVED_SIZE
+) : ByteChannel {
+ // internal constructor for reading of byte buffers
+ constructor(content: ByteBuffer) : this(false, BufferObjectNoPool, 0) {
+ state = ReadWriteBufferState.Initial(content.slice(), 0).apply {
+ capacity.resetForRead()
+ }.startWriting()
+ restoreStateAfterWrite()
+ close()
+ tryTerminate()
+ }
+
+ @Volatile
+ private var state: ReadWriteBufferState = ReadWriteBufferState.IdleEmpty
+
+ @Volatile
+ private var closed: ClosedElement? = null
+
+ @Volatile
+ private var readOp: CancellableContinuation<Boolean>? = null
+
+ @Volatile
+ private var writeOp: CancellableContinuation<Unit>? = null
+
+ private var readPosition = 0
+ private var writePosition = 0
+
+ override var readByteOrder: ByteOrder = ByteOrder.BIG_ENDIAN
+ override var writeByteOrder: ByteOrder = ByteOrder.BIG_ENDIAN
+
+ override val availableForRead: Int
+ get() = state.capacity.availableForRead
+
+ override val availableForWrite: Int
+ get() = state.capacity.availableForWrite
+
+ override val isClosedForRead: Boolean
+ get() = state === ReadWriteBufferState.Terminated
+
+ override val isClosedForWrite: Boolean
+ get() = closed != null
+
+ override fun close(cause: Throwable?): Boolean {
+ if (closed != null) return false
+ val newClosed = if (cause == null) ClosedElement.EmptyCause else ClosedElement(cause)
+ if (!Closed.compareAndSet(this, null, newClosed)) return false
+ flush()
+ if (state.capacity.isEmpty() || cause != null) tryTerminate()
+ resumeClosed(cause)
+ return true
+ }
+
+ override fun flush() {
+ if (!state.capacity.flush()) return
+ resumeReadOp()
+ if (availableForWrite > 0) resumeWriteOp()
+ }
+
+ private fun ByteBuffer.prepareBuffer(order: ByteOrder, position: Int, available: Int) {
+ require(position >= 0)
+ require(available >= 0)
+
+ val bufferLimit = capacity() - reservedSize
+ val virtualLimit = position + available
+
+ order(order)
+ limit(virtualLimit.coerceAtMost(bufferLimit))
+ position(position)
+ }
+
+ private fun setupStateForWrite(): ByteBuffer {
+ var _allocated: ReadWriteBufferState.Initial? = null
+ val (old, newState) = updateState { state ->
+ when (state) {
+ ReadWriteBufferState.IdleEmpty -> {
+ val allocated = _allocated ?: newBuffer().also { _allocated = it }
+ allocated.startWriting()
+ }
+ ReadWriteBufferState.Terminated -> throw closed!!.sendException
+ else -> {
+ state.startWriting()
+ }
+ }
+ }
+ val buffer = newState.writeBuffer
+
+ _allocated?.let { allocated ->
+ if (old !== ReadWriteBufferState.IdleEmpty) {
+ releaseBuffer(allocated)
+ }
+ }
+ return buffer.apply {
+ prepareBuffer(writeByteOrder, writePosition, newState.capacity.availableForWrite)
+ }
+ }
+
+ private fun restoreStateAfterWrite() {
+ var toRelease: ReadWriteBufferState.IdleNonEmpty? = null
+
+ val (_, newState) = updateState {
+ val writeStopped = it.stopWriting()
+ if (writeStopped is ReadWriteBufferState.IdleNonEmpty && writeStopped.capacity.isEmpty()) {
+ toRelease = writeStopped
+ ReadWriteBufferState.IdleEmpty
+ } else {
+ writeStopped
+ }
+ }
+
+ if (newState === ReadWriteBufferState.IdleEmpty) {
+ toRelease?.let { releaseBuffer(it.initial) }
+ }
+ }
+
+ private fun setupStateForRead(): ByteBuffer? {
+ val (_, newState) = updateState { state ->
+ when (state) {
+ ReadWriteBufferState.Terminated -> closed!!.cause?.let { throw it } ?: return null
+ ReadWriteBufferState.IdleEmpty -> closed?.cause?.let { throw it } ?: return null
+ else -> {
+ if (state.capacity.availableForRead == 0) return null
+ state.startReading()
+ }
+ }
+ }
+
+ return newState.readBuffer.apply {
+ prepareBuffer(readByteOrder, readPosition, newState.capacity.availableForRead)
+ }
+ }
+
+ private fun restoreStateAfterRead() {
+ var toRelease: ReadWriteBufferState.IdleNonEmpty? = null
+
+ val (_, newState) = updateState { state ->
+ toRelease?.let {
+ it.capacity.resetForWrite()
+ resumeWriteOp()
+ toRelease = null
+ }
+
+ val readStopped = state.stopReading()
+
+ if (readStopped is ReadWriteBufferState.IdleNonEmpty) {
+ if (this.state === state && readStopped.capacity.tryLockForRelease()) {
+ toRelease = readStopped
+ ReadWriteBufferState.IdleEmpty
+ } else {
+ readStopped
+ }
+ } else {
+ readStopped
+ }
+ }
+
+ if (newState === ReadWriteBufferState.IdleEmpty) {
+ toRelease?.let { releaseBuffer(it.initial) }
+ resumeWriteOp()
+ }
+ }
+
+ private fun tryTerminate() {
+ val closed = closed ?: return
+ var toRelease: ReadWriteBufferState.Initial? = null
+
+ updateState { state ->
+ when {
+ state === ReadWriteBufferState.IdleEmpty -> ReadWriteBufferState.Terminated
+ closed.cause != null && state is ReadWriteBufferState.IdleNonEmpty -> {
+ toRelease = state.initial
+ ReadWriteBufferState.Terminated
+ }
+ else -> return
+ }
+ }
+
+ toRelease?.let { buffer ->
+ if (state === ReadWriteBufferState.Terminated) {
+ releaseBuffer(buffer)
+ }
+ }
+
+ WriteOp.getAndSet(this, null)?.resumeWithException(closed.sendException)
+ ReadOp.getAndSet(this, null)?.apply {
+ if (closed.cause != null) resumeWithException(closed.cause) else resume(false)
+ }
+ }
+
+ private fun ByteBuffer.carryIndex(idx: Int) = if (idx >= capacity() - reservedSize) idx - (capacity() - reservedSize) else idx
+
+ private inline fun writing(block: ByteBuffer.(RingBufferCapacity) -> Unit) {
+ val buffer = setupStateForWrite()
+ val capacity = state.capacity
+ try {
+ closed?.let { throw it.sendException }
+ block(buffer, capacity)
+ } finally {
+ if (capacity.isFull() || autoFlush) flush()
+ restoreStateAfterWrite()
+ tryTerminate()
+ }
+ }
+
+ private inline fun reading(block: ByteBuffer.(RingBufferCapacity) -> Boolean): Boolean {
+ val buffer = setupStateForRead() ?: return false
+ val capacity = state.capacity
+ try {
+ if (capacity.availableForRead == 0) return false
+
+ return block(buffer, capacity)
+ } finally {
+ restoreStateAfterRead()
+ tryTerminate()
+ }
+ }
+
+ private fun readAsMuchAsPossible(dst: ByteBuffer, consumed0: Int = 0): Int {
+ var consumed = 0
+
+ val rc = reading {
+ val part = it.tryReadAtMost(minOf(remaining(), dst.remaining()))
+ if (part > 0) {
+ consumed += part
+
+ if (dst.remaining() >= remaining()) {
+ dst.put(this)
+ } else {
+ repeat(part) {
+ dst.put(get())
+ }
+ }
+
+ bytesRead(it, part)
+ true
+ } else {
+ false
+ }
+ }
+
+ return if (rc && dst.hasRemaining() && state.capacity.availableForRead > 0)
+ readAsMuchAsPossible(dst, consumed0 + consumed)
+ else consumed + consumed0
+ }
+
+ private fun readAsMuchAsPossible(dst: ByteArray, offset: Int, length: Int, consumed0: Int = 0): Int {
+ var consumed = 0
+
+ val rc = reading {
+ val part = it.tryReadAtMost(minOf(remaining(), length))
+ if (part > 0) {
+ consumed += part
+ get(dst, offset, part)
+
+ bytesRead(it, part)
+ true
+ } else {
+ false
+ }
+ }
+
+ return if (rc && consumed < length && state.capacity.availableForRead > 0)
+ readAsMuchAsPossible(dst, offset + consumed, length - consumed, consumed0 + consumed)
+ else consumed + consumed0
+ }
+
+ suspend override fun readFully(dst: ByteArray, offset: Int, length: Int) {
+ val consumed = readAsMuchAsPossible(dst, offset, length)
+
+ if (consumed < length) {
+ readFullySuspend(dst, offset + consumed, length - consumed)
+ }
+ }
+
+ suspend override fun readFully(dst: ByteBuffer): Int {
+ val rc = readAsMuchAsPossible(dst)
+ if (!dst.hasRemaining()) return rc
+
+ return readFullySuspend(dst, rc)
+ }
+
+ private suspend fun readFullySuspend(dst: ByteBuffer, rc0: Int): Int {
+ var copied = rc0
+
+ while (dst.hasRemaining()) {
+ if (!readSuspend(1)) throw ClosedReceiveChannelException("Unexpected EOF: expected ${dst.remaining()} more bytes")
+ copied += readAsMuchAsPossible(dst)
+ }
+
+ return copied
+ }
+
+ private suspend tailrec fun readFullySuspend(dst: ByteArray, offset: Int, length: Int) {
+ if (!readSuspend(1)) throw ClosedReceiveChannelException("Unexpected EOF: expected $length more bytes")
+
+ val consumed = readAsMuchAsPossible(dst, offset, length)
+
+ if (consumed < length) {
+ readFullySuspend(dst, offset + consumed, length - consumed)
+ }
+ }
+
+ suspend override fun readAvailable(dst: ByteArray, offset: Int, length: Int): Int {
+ val consumed = readAsMuchAsPossible(dst, offset, length)
+
+ return when {
+ consumed == 0 && closed != null -> -1
+ consumed > 0 || length == 0 -> consumed
+ else -> readAvailableSuspend(dst, offset, length)
+ }
+ }
+
+ suspend override fun readAvailable(dst: ByteBuffer): Int {
+ val consumed = readAsMuchAsPossible(dst)
+
+ return when {
+ consumed == 0 && closed != null -> -1
+ consumed > 0 || !dst.hasRemaining() -> consumed
+ else -> readAvailableSuspend(dst)
+ }
+ }
+
+ private suspend fun readAvailableSuspend(dst: ByteArray, offset: Int, length: Int): Int {
+ if (!readSuspend(1)) return -1
+ return readAvailable(dst, offset, length)
+ }
+
+ private suspend fun readAvailableSuspend(dst: ByteBuffer): Int {
+ if (!readSuspend(1)) return -1
+ return readAvailable(dst)
+ }
+
+ suspend override fun readPacket(size: Int): ByteReadPacket {
+ closed?.cause?.let { throw it }
+
+ if (size == 0) return ByteReadPacketEmpty
+
+ val builder = ByteWritePacketImpl(BufferPool)
+ val buffer = BufferPool.borrow()
+
+ try {
+ var remaining = size
+ while (remaining > 0) {
+ buffer.clear()
+ if (buffer.remaining() > remaining) {
+ buffer.limit(remaining)
+ }
+
+ val rc = readFully(buffer)
+ buffer.flip()
+ builder.writeFully(buffer)
+
+ remaining -= rc
+ }
+
+ return builder.build()
+ } catch (t: Throwable) {
+ builder.release()
+ throw t
+ } finally {
+ BufferPool.recycle(buffer)
+ }
+ }
+
+ suspend override fun readByte(): Byte {
+ var b: Byte = 0
+
+ val rc = reading {
+ if (it.tryReadExact(1)) {
+ b = get()
+ bytesRead(it, 1)
+ true
+ } else false
+ }
+
+ return if (rc) {
+ b
+ } else {
+ readByteSuspend()
+ }
+ }
+
+ private suspend fun readByteSuspend(): Byte {
+ if (!readSuspend(1)) throw ClosedReceiveChannelException("EOF: one byte required")
+ return readByte()
+ }
+
+ suspend override fun readBoolean(): Boolean {
+ var b = false
+
+ val rc = reading {
+ if (it.tryReadExact(1)) {
+ b = get() != 0.toByte()
+ bytesRead(it, 1)
+ true
+ } else false
+ }
+
+ return if (rc) {
+ b
+ } else {
+ readBooleanSuspend()
+ }
+ }
+
+ private suspend fun readBooleanSuspend(): Boolean {
+ if (!readSuspend(1)) throw ClosedReceiveChannelException("EOF: one byte required")
+ return readBoolean()
+ }
+
+ suspend override fun readShort(): Short {
+ var sh: Short = 0
+
+ val rc = reading {
+ if (it.tryReadExact(2)) {
+ if (remaining() < 2) rollBytes(2)
+ sh = getShort()
+ bytesRead(it, 2)
+ true
+ } else false
+ }
+
+ return if (rc) {
+ sh
+ } else {
+ readShortSuspend()
+ }
+ }
+
+ private suspend fun readShortSuspend(): Short {
+ if (!readSuspend(2)) throw ClosedReceiveChannelException("EOF while byte expected")
+ return readShort()
+ }
+
+ suspend override fun readInt(): Int {
+ var i = 0
+
+ val rc = reading {
+ if (it.tryReadExact(4)) {
+ if (remaining() < 4) rollBytes(4)
+ i = getInt()
+ bytesRead(it, 4)
+ true
+ } else false
+ }
+
+ return if (rc) {
+ i
+ } else {
+ readIntSuspend()
+ }
+ }
+
+ private suspend fun readIntSuspend(): Int {
+ if (!readSuspend(4)) throw ClosedReceiveChannelException("EOF while an int expected")
+ return readInt()
+ }
+
+ suspend override fun readLong(): Long {
+ var i = 0L
+
+ val rc = reading {
+ if (it.tryReadExact(8)) {
+ if (remaining() < 8) rollBytes(8)
+ i = getLong()
+ bytesRead(it, 8)
+ true
+ } else false
+ }
+
+ return if (rc) {
+ i
+ } else {
+ readLongSuspend()
+ }
+ }
+
+ private suspend fun readLongSuspend(): Long {
+ if (!readSuspend(8)) throw ClosedReceiveChannelException("EOF while a long expected")
+ return readLong()
+ }
+
+ suspend override fun readDouble(): Double {
+ var d = 0.0
+
+ val rc = reading {
+ if (it.tryReadExact(8)) {
+ if (remaining() < 8) rollBytes(8)
+ d = getDouble()
+ bytesRead(it, 8)
+ true
+ } else false
+ }
+
+ return if (rc) {
+ d
+ } else {
+ readDoubleSuspend()
+ }
+ }
+
+ private suspend fun readDoubleSuspend(): Double {
+ if (!readSuspend(8)) throw ClosedReceiveChannelException("EOF while a double expected")
+ return readDouble()
+ }
+
+ suspend override fun readFloat(): Float {
+ var f = 0.0f
+
+ val rc = reading {
+ if (it.tryReadExact(4)) {
+ if (remaining() < 4) rollBytes(4)
+ f = getFloat()
+ bytesRead(it, 4)
+ true
+ } else false
+ }
+
+ return if (rc) {
+ f
+ } else {
+ readFloatSuspend()
+ }
+ }
+
+ private suspend fun readFloatSuspend(): Float {
+ if (!readSuspend(4)) throw ClosedReceiveChannelException("EOF while an int expected")
+ return readFloat()
+ }
+
+ private fun ByteBuffer.rollBytes(n: Int) {
+ limit(position() + n)
+ for (i in 1..n - remaining()) {
+ put(capacity() + ReservedLongIndex + i, get(i))
+ }
+ }
+
+ private fun ByteBuffer.carry() {
+ val base = capacity() - reservedSize
+ for (i in base until position()) {
+ put(i - base, get(i))
+ }
+ }
+
+ private fun ByteBuffer.bytesWritten(c: RingBufferCapacity, n: Int) {
+ require(n >= 0)
+
+ writePosition = carryIndex(writePosition + n)
+ c.completeWrite(n)
+ }
+
+ private fun ByteBuffer.bytesRead(c: RingBufferCapacity, n: Int) {
+ require(n >= 0)
+
+ readPosition = carryIndex(readPosition + n)
+ c.completeRead(n)
+ resumeWriteOp()
+ }
+
+ suspend override fun writeByte(b: Byte) {
+ writing {
+ tryWriteByte(b, it)
+ }
+ }
+
+ private suspend fun ByteBuffer.tryWriteByte(b: Byte, c: RingBufferCapacity) {
+ if (c.tryWriteExact(1)) {
+ put(b)
+ bytesWritten(c, 1)
+ } else {
+ writeByteSuspend(b, c)
+ }
+ }
+
+ private suspend fun ByteBuffer.writeByteSuspend(b: Byte, c: RingBufferCapacity) {
+ writeSuspend(1)
+ tryWriteByte(b, c)
+ }
+
+ suspend override fun writeShort(s: Short) {
+ writing {
+ if (!tryWriteShort(s, it)) {
+ writeShortSuspend(s, it)
+ }
+ }
+ }
+
+ private suspend fun ByteBuffer.writeShortSuspend(s: Short, c: RingBufferCapacity) {
+ writeSuspend(2)
+ tryWriteShort(s, c)
+ }
+
+ private fun ByteBuffer.tryWriteShort(s: Short, c: RingBufferCapacity): Boolean {
+ if (c.tryWriteExact(2)) {
+ if (remaining() < 2) {
+ limit(capacity())
+ putShort(s)
+ carry()
+ } else {
+ putShort(s)
+ }
+
+ bytesWritten(c, 2)
+ return true
+ }
+
+ return false
+ }
+
+ private fun ByteBuffer.tryWriteInt(i: Int, c: RingBufferCapacity): Boolean {
+ if (c.tryWriteExact(4)) {
+ if (remaining() < 4) {
+ limit(capacity())
+ putInt(i)
+ carry()
+ } else {
+ putInt(i)
+ }
+
+ bytesWritten(c, 4)
+ return true
+ }
+
+ return false
+ }
+
+ suspend override fun writeInt(i: Int) {
+ writing {
+ if (!tryWriteInt(i, it)) {
+ writeIntSuspend(i, it)
+ }
+ }
+ }
+
+ private suspend fun ByteBuffer.writeIntSuspend(i: Int, c: RingBufferCapacity) {
+ writeSuspend(4)
+ tryWriteInt(i, c)
+ }
+
+ private fun ByteBuffer.tryWriteLong(l: Long, c: RingBufferCapacity): Boolean {
+ if (c.tryWriteExact(8)) {
+ if (remaining() < 8) {
+ limit(capacity())
+ putLong(l)
+ carry()
+ } else {
+ putLong(l)
+ }
+
+ bytesWritten(c, 8)
+ return true
+ }
+
+ return false
+ }
+
+ suspend override fun writeLong(l: Long) {
+ writing {
+ if (!tryWriteLong(l, it)) {
+ writeLongSuspend(l, it)
+ }
+ }
+ }
+
+ private suspend fun ByteBuffer.writeLongSuspend(l: Long, c: RingBufferCapacity) {
+ writeSuspend(8)
+ tryWriteLong(l, c)
+ }
+
+ suspend override fun writeDouble(d: Double) {
+ writeLong(java.lang.Double.doubleToRawLongBits(d))
+ }
+
+ suspend override fun writeFloat(f: Float) {
+ writeInt(java.lang.Float.floatToRawIntBits(f))
+ }
+
+ suspend override fun writeAvailable(src: ByteBuffer): Int {
+ val copied = writeAsMuchAsPossible(src)
+ if (copied > 0) return copied
+
+ return writeLazySuspend(src)
+ }
+
+ private suspend fun writeLazySuspend(src: ByteBuffer): Int {
+ while (true) {
+ writeSuspend(1)
+ val copied = writeAvailable(src)
+ if (copied > 0) return copied
+ }
+ }
+
+ suspend override fun writeFully(src: ByteBuffer) {
+ writeAsMuchAsPossible(src)
+ if (!src.hasRemaining()) return
+
+ return writeFullySuspend(src)
+ }
+
+ private suspend fun writeFullySuspend(src: ByteBuffer) {
+ while (src.hasRemaining()) {
+ writeSuspend(1)
+ writeAsMuchAsPossible(src)
+ }
+ }
+
+ private fun writeAsMuchAsPossible(src: ByteBuffer): Int {
+ writing {
+ var written = 0
+
+ do {
+ val possibleSize = it.tryWriteAtMost(minOf(src.remaining(), remaining()))
+ if (possibleSize == 0) break
+ require(possibleSize > 0)
+
+ if (remaining() >= src.remaining()) {
+ put(src)
+ } else {
+ repeat(possibleSize) {
+ put(src.get())
+ }
+ }
+
+ written += possibleSize
+
+ prepareBuffer(writeByteOrder, carryIndex(writePosition + written), it.availableForWrite)
+ } while (true)
+
+ bytesWritten(it, written)
+
+ return written
+ }
+
+ return 0
+ }
+
+ private fun writeAsMuchAsPossible(src: ByteArray, offset: Int, length: Int): Int {
+ writing {
+ var written = 0
+
+ do {
+ val possibleSize = it.tryWriteAtMost(minOf(length - written, remaining()))
+ if (possibleSize == 0) break
+ require(possibleSize > 0)
+
+ put(src, offset + written, possibleSize)
+ written += possibleSize
+
+ prepareBuffer(writeByteOrder, carryIndex(writePosition + written), it.availableForWrite)
+ } while (true)
+
+ bytesWritten(it, written)
+
+ return written
+ }
+
+ return 0
+ }
+
+ suspend override fun writeFully(src: ByteArray, offset: Int, length: Int) {
+ var rem = length
+ var off = offset
+
+ while (rem > 0) {
+ val s = writeAsMuchAsPossible(src, off, rem)
+ if (s == 0) break
+
+ off += s
+ rem -= s
+ }
+
+ if (rem == 0) return
+
+ writeFullySuspend(src, off, rem)
+ }
+
+ private tailrec suspend fun writeFullySuspend(src: ByteArray, offset: Int, length: Int) {
+ if (length == 0) return
+ val copied = writeAvailable(src, offset, length)
+ return writeFullySuspend(src, offset + copied, length - copied)
+ }
+
+ suspend override fun writeAvailable(src: ByteArray, offset: Int, length: Int): Int {
+ val size = writeAsMuchAsPossible(src, offset, length)
+ if (size > 0) return size
+ return writeSuspend(src, offset, length)
+ }
+
+ private suspend fun writeSuspend(src: ByteArray, offset: Int, length: Int): Int {
+ while (true) {
+ writeSuspend(1)
+ val size = writeAsMuchAsPossible(src, offset, length)
+ if (size > 0) return size
+ }
+ }
+
+ suspend override fun writePacket(packet: ByteReadPacket) {
+ closed?.sendException?.let { packet.release(); throw it }
+
+ when (packet) {
+ is ByteReadPacketEmpty -> return
+ is ByteReadPacketSingle -> writeSingleBufferPacket(packet)
+ is ByteReadPacketImpl -> writeMultipleBufferPacket(packet)
+ else -> writeExternalPacket(packet)
+ }
+ }
+
+ private suspend fun writeSingleBufferPacket(packet: ByteReadPacketSingle) {
+ val buffer = packet.steal()
+ val t = try {
+ writeAsMuchAsPossible(buffer)
+ null
+ } catch (t: Throwable) {
+ t
+ }
+
+ if (t != null) {
+ packet.pool.recycle(buffer)
+ throw t
+ }
+
+ if (buffer.hasRemaining()) {
+ return writeSingleBufferPacketSuspend(buffer, packet)
+ }
+
+ packet.pool.recycle(buffer)
+ }
+
+ private suspend fun writeMultipleBufferPacket(packet: ByteReadPacketImpl) {
+ var buffer: ByteBuffer? = null
+
+ val t = try {
+ while (packet.remaining > 0) {
+ buffer = packet.steal()
+ writeAsMuchAsPossible(buffer)
+ if (buffer.hasRemaining()) break
+ packet.pool.recycle(buffer)
+ }
+ null
+ } catch (t: Throwable) { t }
+
+ if (t != null) {
+ buffer?.let { packet.pool.recycle(it) }
+ packet.release()
+ throw t
+ }
+
+ if (buffer != null) {
+ return writeMultipleBufferPacketSuspend(buffer, packet)
+ }
+
+ packet.release()
+ }
+
+ private suspend fun writeSingleBufferPacketSuspend(buffer: ByteBuffer, packet: ByteReadPacketSingle) {
+ try {
+ writeFully(buffer)
+ } finally {
+ packet.pool.recycle(buffer)
+ }
+ }
+
+ private suspend fun writeMultipleBufferPacketSuspend(rem: ByteBuffer, packet: ByteReadPacketImpl) {
+ var buffer = rem
+
+ try {
+ do {
+ writeFully(buffer)
+ if (packet.remaining == 0) break
+ packet.pool.recycle(buffer)
+ buffer = packet.steal()
+ } while (true)
+ } finally {
+ packet.pool.recycle(buffer)
+ packet.release()
+ }
+ }
+
+ private suspend fun writeExternalPacket(packet: ByteReadPacket) {
+ val buffer = BufferPool.borrow()
+ val t = try {
+ while (packet.remaining > 0) {
+ buffer.clear()
+ packet.readLazy(buffer)
+ buffer.flip()
+ writeAsMuchAsPossible(buffer)
+ if (buffer.hasRemaining()) {
+ buffer.compact()
+ break
+ }
+ }
+
+ null
+ } catch (t: Throwable) {
+ t
+ }
+
+ buffer.flip()
+ if (buffer.hasRemaining()) {
+ return writeExternalPacketSuspend(buffer, packet)
+ }
+
+ BufferPool.recycle(buffer)
+ packet.release()
+
+ if (t != null) throw t
+ }
+
+ private suspend fun writeExternalPacketSuspend(buffer: ByteBuffer, packet: ByteReadPacket) {
+ try {
+ do {
+ buffer.compact()
+ packet.readLazy(buffer)
+ buffer.flip()
+ writeFully(buffer)
+ } while (packet.remaining > 0)
+ } finally {
+ BufferPool.recycle(buffer)
+ packet.release()
+ }
+ }
+
+ /**
+ * Invokes [visitor] for every available batch until all bytes processed or visitor if visitor returns false.
+ * Never invokes [visitor] with empty buffer unless [last] = true. Invokes visitor with last = true at most once
+ * even if there are remaining bytes and visitor returned true.
+ */
+ override suspend fun lookAhead(visitor: (buffer: ByteBuffer, last: Boolean) -> Boolean) {
+ if (lookAheadFast(false, visitor)) return
+ lookAheadSuspend(visitor)
+ }
+
+ private inline fun lookAheadFast(last: Boolean, visitor: (buffer: ByteBuffer, last: Boolean) -> Boolean): Boolean {
+ if (state === ReadWriteBufferState.Terminated && !last) return false
+
+ val rc = reading {
+ do {
+ val available = state.capacity.availableForRead
+
+ val rem = if (available > 0 || last) {
+ if (!visitor(this, last)) {
+ afterBufferVisited(this, it)
+ return true
+ }
+
+ val consumed = afterBufferVisited(this, it)
+ available - consumed
+ } else 0
+ } while (rem > 0 && !last)
+
+ last
+ }
+
+ if (!rc && closed != null) {
+ visitor(EmptyByteBuffer, true)
+ }
+
+ return rc
+ }
+
+ private suspend fun lookAheadSuspend(visitor: (buffer: ByteBuffer, last: Boolean) -> Boolean): Boolean {
+ var last = false
+
+ do {
+ if (lookAheadFast(last, visitor)) return true
+ if (last) return false
+ if (!readSuspend(1)) {
+ last = true
+ }
+ } while (true)
+ }
+
+ private fun afterBufferVisited(buffer: ByteBuffer, c: RingBufferCapacity): Int {
+ val consumed = buffer.position() - readPosition
+ if (consumed > 0) {
+ if (!c.tryReadExact(consumed)) throw IllegalStateException("Consumed more bytes than available")
+
+ buffer.bytesRead(c, consumed)
+ buffer.prepareBuffer(readByteOrder, readPosition, c.availableForRead)
+ }
+
+ return consumed
+ }
+
+ private suspend fun readUTF8LineToAscii(out: Appendable, limit: Int): Boolean {
+ if (state === ReadWriteBufferState.Terminated) return false
+
+ var cr = false
+ var consumed = 0
+ var unicodeStarted = false
+ var eol = false
+
+ lookAheadFast(false) { buffer, last ->
+ var forceConsume = false
+
+ val rejected = !buffer.decodeASCII { ch ->
+ when {
+ ch == '\r' -> {
+ cr = true
+ true
+ }
+ ch == '\n' -> {
+ eol = true
+ forceConsume = true
+ false
+ }
+ cr -> {
+ cr = false
+ eol = true
+ false
+ }
+ else -> {
+ if (consumed == limit) throw BufferOverflowException()
+ consumed++
+ out.append(ch)
+ true
+ }
+ }
+ }
+
+ if (cr && last) {
+ eol = true
+ }
+
+ if (eol && forceConsume) {
+ buffer.position(buffer.position() + 1)
+ }
+
+ if (rejected && buffer.hasRemaining() && !eol) {
+ unicodeStarted = true
+ false
+ } else
+ !eol && !last
+ }
+
+ if (eol && !unicodeStarted) return true
+ return readUTF8LineToUtf8(out, limit - consumed, cr, consumed)
+ }
+
+ private suspend fun readUTF8LineToUtf8(out: Appendable, limit: Int, cr0: Boolean, consumed0: Int): Boolean {
+ var cr1 = cr0
+ var consumed1 = 0
+ var eol = false
+
+ lookAheadFast(false) { buffer, last ->
+ var forceConsume = false
+
+ val rc = buffer.decodeUTF8 { ch ->
+ when {
+ ch == '\r' -> {
+ cr1 = true
+ true
+ }
+ ch == '\n' -> {
+ eol = true
+ forceConsume = true
+ false
+ }
+ cr1 -> {
+ cr1 = false
+ eol = true
+ false
+ }
+ else -> {
+ if (consumed1 == limit) throw BufferOverflowException()
+ consumed1++
+ out.append(ch)
+ true
+ }
+ }
+ }
+
+ if (cr1 && last) {
+ eol = true
+ }
+
+ if (eol && forceConsume) {
+ buffer.position(buffer.position() + 1)
+ }
+
+ rc != 0 && !eol && !last
+ }
+
+ if (eol) return true
+
+ return readUTF8LineToUtf8Suspend(out, limit, cr1, consumed1 + consumed0)
+ }
+
+ private suspend fun readUTF8LineToUtf8Suspend(out: Appendable, limit: Int, cr0: Boolean, consumed0: Int): Boolean {
+ var cr1 = cr0
+ var consumed1 = 0
+ var eol = false
+ var wrap = 0
+
+ lookAheadSuspend { buffer, last ->
+ var forceConsume = false
+
+ val rc = buffer.decodeUTF8 { ch ->
+ when {
+ ch == '\r' -> {
+ cr1 = true
+ true
+ }
+ ch == '\n' -> {
+ eol = true
+ forceConsume = true
+ false
+ }
+ cr1 -> {
+ cr1 = false
+ eol = true
+ false
+ }
+ else -> {
+ if (consumed1 == limit) throw BufferOverflowException()
+ consumed1++
+ out.append(ch)
+ true
+ }
+ }
+ }
+
+ if (cr1 && last) {
+ eol = true
+ }
+
+ if (eol && forceConsume) {
+ buffer.position(buffer.position() + 1)
+ }
+
+ wrap = maxOf(0, rc)
+
+ wrap == 0 && !eol && !last
+ }
+
+ if (wrap != 0) {
+ if (!readSuspend(wrap)) {
+
+ }
+
+ return readUTF8LineToUtf8Suspend(out, limit, cr1, consumed1)
+ }
+
+ return (consumed1 > 0 || consumed0 > 0 || eol)
+ }
+
+ suspend override fun <A : Appendable> readUTF8LineTo(out: A, limit: Int) = readUTF8LineToAscii(out, limit)
+
+ private fun resumeReadOp() {
+ ReadOp.getAndSet(this, null)?.resume(true)
+ }
+
+ private fun resumeWriteOp() {
+ WriteOp.getAndSet(this, null)?.apply {
+ val closed = closed
+ if (closed == null) resume(Unit) else resumeWithException(closed.sendException)
+ }
+ }
+
+ private fun resumeClosed(cause: Throwable?) {
+ ReadOp.getAndSet(this, null)?.let { c ->
+ if (cause != null)
+ c.resumeWithException(cause)
+ else
+ c.resume(state.capacity.availableForRead > 0)
+ }
+
+ WriteOp.getAndSet(this, null)?.tryResumeWithException(cause ?: ClosedSendChannelException(null))
+ }
+
+ private tailrec suspend fun readSuspend(size: Int): Boolean {
+ if (state.capacity.availableForRead >= size) return true
+
+ closed?.let { c ->
+ if (c.cause == null) return false
+ throw c.cause
+ }
+
+ if (!readSuspendImpl(size)) return false
+
+ return readSuspend(size)
+ }
+
+ private suspend fun readSuspendImpl(size: Int): Boolean {
+ if (state.capacity.availableForRead >= size) return true
+
+ return suspendCancellableCoroutine(holdCancellability = true) { c ->
+ do {
+ if (state.capacity.availableForRead >= size) {
+ c.resume(true)
+ break
+ }
+
+ closed?.let {
+ if (it.cause == null && state.capacity.availableForRead == 0) {
+ c.resume(false)
+ return@suspendCancellableCoroutine
+ } else if (it.cause != null) {
+ c.resumeWithException(it.cause)
+ return@suspendCancellableCoroutine
+ }
+ }
+ } while (!setContinuation({ readOp }, ReadOp, c, { closed == null && state.capacity.availableForRead < size }))
+ }
+ }
+
+ private suspend fun writeSuspend(size: Int) {
+ closed?.sendException?.let { throw it }
+
+ while (state.capacity.availableForWrite < size && state !== ReadWriteBufferState.IdleEmpty && closed == null) {
+ suspendCancellableCoroutine<Unit>(holdCancellability = true) { c ->
+ do {
+ closed?.sendException?.let { throw it }
+ if (state.capacity.availableForWrite >= size || state === ReadWriteBufferState.IdleEmpty) {
+ c.resume(Unit)
+ break
+ }
+ } while (!setContinuation({ writeOp }, WriteOp, c, { closed == null && state.capacity.availableForWrite < size && state !== ReadWriteBufferState.IdleEmpty }))
+
+ flush()
+ }
+ }
+
+ closed?.sendException?.let { throw it }
+ }
+
+ private inline fun <T, C : CancellableContinuation<T>> setContinuation(getter: () -> C?, updater: AtomicReferenceFieldUpdater<ByteBufferChannel, C?>, continuation: C, predicate: () -> Boolean): Boolean {
+ while (true) {
+ val current = getter()
+ if (current != null) throw IllegalStateException("Operation is already in progress")
+
+ if (!predicate()) {
+ return false
+ }
+
+ if (updater.compareAndSet(this, null, continuation)) {
+ if (predicate() || !updater.compareAndSet(this, continuation, null)) {
+ continuation.initCancellability()
+ return true
+ }
+
+ return false
+ }
+ }
+ }
+
+ private fun newBuffer(): ReadWriteBufferState.Initial {
+ val result = pool.borrow()
+
+ result.readBuffer.order(readByteOrder)
+ result.writeBuffer.order(writeByteOrder)
+ result.capacity.resetForWrite()
+
+ return result
+ }
+
+ private fun releaseBuffer(buffer: ReadWriteBufferState.Initial) {
+ pool.recycle(buffer)
+ }
+
+ // todo: replace with atomicfu
+ private inline fun updateState(block: (ReadWriteBufferState) -> ReadWriteBufferState?):
+ Pair<ReadWriteBufferState, ReadWriteBufferState> = update({ state }, State, block)
+
+ // todo: replace with atomicfu
+ private inline fun <T : Any> update(getter: () -> T, updater: AtomicReferenceFieldUpdater<ByteBufferChannel, T>, block: (old: T) -> T?): Pair<T, T> {
+ while (true) {
+ val old = getter()
+ val newValue = block(old) ?: continue
+ if (old === newValue || updater.compareAndSet(this, old, newValue)) return Pair(old, newValue)
+ }
+ }
+
+ companion object {
+
+ private const val ReservedLongIndex = -8
+
+ // todo: replace with atomicfu, remove companion object
+ private val State = updater(ByteBufferChannel::state)
+ private val WriteOp = updater(ByteBufferChannel::writeOp)
+ private val ReadOp = updater(ByteBufferChannel::readOp)
+ private val Closed = updater(ByteBufferChannel::closed)
+ }
+
+ private class ClosedElement(val cause: Throwable?) {
+ val sendException: Throwable
+ get() = cause ?: ClosedWriteChannelException("The channel was closed")
+
+ companion object {
+ val EmptyCause = ClosedElement(null)
+ }
+ }
+}
+
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteChannel.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteChannel.kt
new file mode 100644
index 0000000..ebcfc1b
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteChannel.kt
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.io.internal.EmptyByteBuffer
+
+/**
+ * Channel for asynchronous reading and writing of sequences of bytes.
+ * This is a buffered **single-reader single-writer channel**.
+ *
+ * Read operations can be invoked concurrently with write operations, but multiple reads or multiple writes
+ * cannot be invoked concurrently with themselves. Exceptions are [close] and [flush] which can be invoked
+ * concurrently with any other operations and between themselves at any time.
+ */
+interface ByteChannel : ByteReadChannel, ByteWriteChannel
+
+/**
+ * Creates buffered channel for asynchronous reading and writing of sequences of bytes.
+ */
+public fun ByteChannel(autoFlush: Boolean = false): ByteChannel =
+ ByteBufferChannel(autoFlush)
+
+/**
+ * Creates channel for reading from the specified byte buffer.
+ */
+public fun ByteReadChannel(content: ByteBuffer): ByteReadChannel =
+ ByteBufferChannel(content)
+
+/**
+ * Creates channel for reading from the specified byte array.
+ */
+public fun ByteReadChannel(content: ByteArray): ByteReadChannel =
+ ByteBufferChannel(ByteBuffer.wrap(content))
+
+
+/**
+ * Byte channel that is always empty.
+ */
+val EmptyByteReadChannel: ByteReadChannel = ByteReadChannel(EmptyByteBuffer)
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteChannelCoroutine.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteChannelCoroutine.kt
new file mode 100644
index 0000000..f4803e5
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteChannelCoroutine.kt
@@ -0,0 +1,17 @@
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+
+internal open class ByteChannelCoroutine(
+ parentContext: CoroutineContext,
+ open val channel: ByteChannel
+) : AbstractCoroutine<Unit>(parentContext, active = true) {
+ override fun afterCompletion(state: Any?, mode: Int) {
+ val cause = (state as? JobSupport.CompletedExceptionally)?.cause
+ if (!channel.close(cause) && cause != null)
+ handleCoroutineException(context, cause)
+
+ super.afterCompletion(state, mode)
+ }
+}
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteOrder.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteOrder.kt
new file mode 100644
index 0000000..bad461f
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteOrder.kt
@@ -0,0 +1,6 @@
+package kotlinx.coroutines.experimental.io
+
+/**
+ * Byte order.
+ */
+typealias ByteOrder = java.nio.ByteOrder
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteReadChannel.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteReadChannel.kt
new file mode 100644
index 0000000..d455443
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteReadChannel.kt
@@ -0,0 +1,132 @@
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.io.internal.BufferObjectPool
+import kotlinx.coroutines.experimental.io.packet.ByteReadPacket
+
+/**
+ * Channel for asynchronous reading of sequences of bytes.
+ * This is a **single-reader channel**.
+ *
+ * Operations on this channel cannot be invoked concurrently.
+ */
+public interface ByteReadChannel {
+ /**
+ * Returns number of bytes that can be read without suspension. Read operations do no suspend and return
+ * immediately when this number is at least the number of bytes requested for read.
+ */
+ public val availableForRead: Int
+
+ /**
+ * Returns `true` if the channel is closed and no remaining bytes are available for read.
+ * It implies that [availableForRead] is zero.
+ */
+ public val isClosedForRead: Boolean
+
+ /**
+ * Byte order that is used for multi-byte read operations
+ * (such as [readShort], [readInt], [readLong], [readFloat], and [readDouble]).
+ */
+ public var readByteOrder: ByteOrder
+
+ /**
+ * Reads all available bytes to [dst] buffer and returns immediately or suspends if no bytes available
+ * @return number of bytes were read or `-1` if the channel has been closed
+ */
+ suspend fun readAvailable(dst: ByteArray, offset: Int, length: Int): Int
+ suspend fun readAvailable(dst: ByteArray) = readAvailable(dst, 0, dst.size)
+ suspend fun readAvailable(dst: ByteBuffer): Int
+
+ /**
+ * Reads all [length] bytes to [dst] buffer or fails if channel has been closed.
+ * Suspends if not enough bytes available.
+ */
+ suspend fun readFully(dst: ByteArray, offset: Int, length: Int)
+ suspend fun readFully(dst: ByteArray) = readFully(dst, 0, dst.size)
+ suspend fun readFully(dst: ByteBuffer): Int
+
+ /**
+ * Reads the specified amount of bytes and makes a byte packet from them. Fails if channel has been closed
+ * and not enough bytes available.
+ */
+ suspend fun readPacket(size: Int): ByteReadPacket
+
+ /**
+ * Reads a long number (suspending if not enough bytes available) or fails if channel has been closed
+ * and not enough bytes.
+ */
+ suspend fun readLong(): Long
+
+ /**
+ * Reads an int number (suspending if not enough bytes available) or fails if channel has been closed
+ * and not enough bytes.
+ */
+ suspend fun readInt(): Int
+
+ /**
+ * Reads a short number (suspending if not enough bytes available) or fails if channel has been closed
+ * and not enough bytes.
+ */
+ suspend fun readShort(): Short
+
+ /**
+ * Reads a byte (suspending if no bytes available yet) or fails if channel has been closed
+ * and not enough bytes.
+ */
+ suspend fun readByte(): Byte
+
+ /**
+ * Reads a boolean value (suspending if no bytes available yet) or fails if channel has been closed
+ * and not enough bytes.
+ */
+ suspend fun readBoolean(): Boolean
+
+ /**
+ * Reads double number (suspending if not enough bytes available) or fails if channel has been closed
+ * and not enough bytes.
+ */
+ suspend fun readDouble(): Double
+
+ /**
+ * Reads float number (suspending if not enough bytes available) or fails if channel has been closed
+ * and not enough bytes.
+ */
+ suspend fun readFloat(): Float
+
+ suspend fun lookAhead(visitor: (buffer: ByteBuffer, last: Boolean) -> Boolean)
+
+ /**
+ * Reads a line of UTF-8 characters to the specified [out] buffer up to [limit] characters.
+ * Supports both CR-LF and LF line endings.
+ * Throws an exception if the specified [limit] has been exceeded.
+ *
+ * @return `true` if line has been read (possibly empty) or `false` if channel has been closed
+ * and no characters were read.
+ */
+ suspend fun <A : Appendable> readUTF8LineTo(out: A, limit: Int = Int.MAX_VALUE): Boolean
+}
+
+suspend fun ByteReadChannel.copyAndClose(dst: ByteWriteChannel): Long {
+ val o = BufferObjectPool.borrow()
+ try {
+ var copied = 0L
+ val bb = o.backingBuffer
+
+ while (true) {
+ bb.clear()
+ val size = readAvailable(bb)
+ if (size == -1) break
+
+ bb.flip()
+ dst.writeFully(bb)
+ copied += size
+ }
+
+ dst.close()
+ return copied
+ } catch (t: Throwable) {
+ dst.close(t)
+ throw t
+ } finally {
+ BufferObjectPool.recycle(o)
+ }
+}
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteWriteChannel.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteWriteChannel.kt
new file mode 100644
index 0000000..d654e29
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteWriteChannel.kt
@@ -0,0 +1,163 @@
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.io.packet.*
+import java.nio.ByteBuffer
+import java.nio.CharBuffer
+import java.util.concurrent.CancellationException
+
+/**
+ * Channel for asynchronous writing of sequences of bytes.
+ * This is a **single-writer channel**.
+ *
+ * Operations on this channel cannot be invoked concurrently, unless explicitly specified otherwise
+ * in description. Exceptions are [close] and [flush].
+ */
+public interface ByteWriteChannel {
+ /**
+ * Returns number of bytes that can be written without suspension. Write operations do no suspend and return
+ * immediately when this number is at least the number of bytes requested for write.
+ */
+ public val availableForWrite: Int
+
+ /**
+ * Returns `true` is channel has been closed and attempting to write to the channel will cause an exception.
+ */
+ public val isClosedForWrite: Boolean
+
+ /**
+ * Returns `true` if channel flushes automatically all pending bytes after every write function call.
+ * If `false` then flush only happens at manual [flush] invocation or when the buffer is full.
+ */
+ public val autoFlush: Boolean
+
+ /**
+ * Byte order that is used for multi-byte write operations
+ * (such as [writeShort], [writeInt], [writeLong], [writeFloat], and [writeDouble]).
+ */
+ public var writeByteOrder: ByteOrder
+
+ /**
+ * Writes as much as possible and only suspends if buffer is full
+ */
+ suspend fun writeAvailable(src: ByteArray, offset: Int, length: Int): Int
+ suspend fun writeAvailable(src: ByteArray) = writeAvailable(src, 0, src.size)
+ suspend fun writeAvailable(src: ByteBuffer): Int
+
+ /**
+ * Writes all [src] bytes and suspends until all bytes written. Causes flush if buffer filled up or when [autoFlush]
+ * Crashes if channel get closed while writing.
+ */
+ suspend fun writeFully(src: ByteArray, offset: Int, length: Int)
+ suspend fun writeFully(src: ByteArray) = writeFully(src, 0, src.size)
+ suspend fun writeFully(src: ByteBuffer)
+
+ /**
+ * Writes a [packet] fully or fails if channel get closed before the whole packet has been written
+ */
+ suspend fun writePacket(packet: ByteReadPacket)
+ /**
+ * Writes long number and suspends until written.
+ * Crashes if channel get closed while writing.
+ */
+ suspend fun writeLong(l: Long)
+
+ /**
+ * Writes int number and suspends until written.
+ * Crashes if channel get closed while writing.
+ */
+ suspend fun writeInt(i: Int)
+
+ /**
+ * Writes short number and suspends until written.
+ * Crashes if channel get closed while writing.
+ */
+ suspend fun writeShort(s: Short)
+
+ /**
+ * Writes byte and suspends until written.
+ * Crashes if channel get closed while writing.
+ */
+ suspend fun writeByte(b: Byte)
+
+ /**
+ * Writes double number and suspends until written.
+ * Crashes if channel get closed while writing.
+ */
+ suspend fun writeDouble(d: Double)
+
+ /**
+ * Writes float number and suspends until written.
+ * Crashes if channel get closed while writing.
+ */
+ suspend fun writeFloat(f: Float)
+
+ /**
+ * Closes this channel with an optional exceptional [cause].
+ * It flushes all pending write bytes (via [flush]).
+ * This is an idempotent operation -- repeated invocations of this function have no effect and return `false`.
+ *
+ * A channel that was closed without a [cause], is considered to be _closed normally_.
+ * A channel that was closed with non-null [cause] is called a _failed channel_. Attempts to read or
+ * write on a failed channel throw this cause exception.
+ *
+ * After invocation of this operation [isClosedForWrite] starts returning `true` and
+ * all subsequent write operations throw [ClosedWriteChannelException] or the specified [cause].
+ * However, [isClosedForRead][ByteReadChannel.isClosedForRead] on the side of [ByteReadChannel]
+ * starts returning `true` only after all written bytes have been read.
+ */
+ public fun close(cause: Throwable? = null): Boolean
+
+ /**
+ * Flushes all pending write bytes making them available for read.
+ *
+ * This function is thread-safe and can be invoked in any thread at any time.
+ * It does nothing when invoked on a closed channel.
+ */
+ public fun flush()
+}
+
+suspend fun ByteWriteChannel.writeShort(s: Int) {
+ writeShort((s and 0xffff).toShort())
+}
+
+suspend fun ByteWriteChannel.writeByte(b: Int) {
+ writeByte((b and 0xff).toByte())
+}
+
+suspend fun ByteWriteChannel.writeInt(i: Long) {
+ writeInt(i.toInt())
+}
+
+suspend fun ByteWriteChannel.writeStringUtf8(s: CharSequence) {
+ val packet = buildPacket {
+ writeStringUtf8(s)
+ }
+ writePacket(packet)
+}
+
+suspend fun ByteWriteChannel.writeStringUtf8(s: CharBuffer) {
+ val packet = buildPacket {
+ writeStringUtf8(s)
+ }
+ writePacket(packet)
+}
+
+suspend fun ByteWriteChannel.writeStringUtf8(s: String) {
+ val packet = buildPacket {
+ writeStringUtf8(s)
+ }
+ writePacket(packet)
+}
+
+suspend fun ByteWriteChannel.writeBoolean(b: Boolean) {
+ writeByte(if (b) 1 else 0)
+}
+
+/**
+ * Writes UTF16 character
+ */
+suspend fun ByteWriteChannel.writeChar(ch: Char) {
+ writeShort(ch.toInt())
+}
+
+class ClosedWriteChannelException(message: String?) : CancellationException(message)
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/MigrationUtils.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/MigrationUtils.kt
new file mode 100644
index 0000000..e4815c2
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/MigrationUtils.kt
@@ -0,0 +1,39 @@
+package kotlinx.coroutines.experimental.io
+
+/**
+ * See [java.io.DataOutput.writeUTF]
+ */
+@Deprecated("Use writeStringUtf8 instead", ReplaceWith("writeStringUtf8(s)"))
+suspend fun ByteWriteChannel.writeUTF(s: String) {
+ writeStringUtf8(s)
+}
+
+/**
+ * See [java.io.DataOutput.writeChars]
+ */
+suspend fun ByteWriteChannel.writeChars(s: String) {
+ for (ch in s) {
+ writeShort(ch.toInt())
+ }
+}
+
+/**
+ * See [java.io.DataOutput.writeBytes]
+ */
+suspend fun ByteWriteChannel.writeBytes(s: String) {
+ for (ch in s) {
+ writeByte(ch.toInt())
+ }
+}
+
+/**
+ * See [java.io.DataOutput.write]
+ */
+@Deprecated("Use writeFully instead", ReplaceWith("writeFully(src)"))
+suspend fun ByteWriteChannel.write(src: ByteArray) = writeFully(src)
+
+/**
+ * See [java.io.DataOutput.write]
+ */
+@Deprecated("Use writeFully instead", ReplaceWith("writeFully(src, offset, length)"))
+suspend fun ByteWriteChannel.write(src: ByteArray, offset: Int, length: Int) = writeFully(src, offset, length)
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ReaderJob.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ReaderJob.kt
new file mode 100644
index 0000000..e88adb5
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ReaderJob.kt
@@ -0,0 +1,34 @@
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+
+/**
+ * A coroutine job that is reading from a byte channel
+ */
+interface ReaderJob : Job {
+ /**
+ * A reference to the channel that this coroutine is reading from
+ */
+ val channel: ByteWriteChannel
+}
+
+interface ReaderScope : CoroutineScope {
+ val channel: ByteReadChannel
+}
+
+fun reader(coroutineContext: CoroutineContext,
+ channel: ByteChannel,
+ block: suspend ReaderScope.() -> Unit): ReaderJob {
+ val coroutine = ReaderCoroutine(newCoroutineContext(coroutineContext), channel)
+ coroutine.initParentJob(coroutineContext[Job])
+ block.startCoroutine(coroutine, coroutine)
+ return coroutine
+}
+
+fun reader(coroutineContext: CoroutineContext,
+ autoFlush: Boolean = false,
+ block: suspend ReaderScope.() -> Unit): ReaderJob = reader(coroutineContext, ByteChannel(autoFlush), block)
+
+private class ReaderCoroutine(context: CoroutineContext, channel: ByteChannel)
+ : ByteChannelCoroutine(context, channel), ReaderJob, ReaderScope
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/Strings.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/Strings.kt
new file mode 100644
index 0000000..c6923cb
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/Strings.kt
@@ -0,0 +1,14 @@
+package kotlinx.coroutines.experimental.io
+
+suspend fun ByteReadChannel.readASCIILine(estimate: Int = 16, limit: Int = Int.MAX_VALUE): String? {
+ val sb = StringBuilder(estimate)
+ return if (readUTF8LineTo(sb, limit)) sb.toString() else null
+}
+
+suspend fun ByteReadChannel.readUTF8Line(estimate: Int = 16, limit: Int = Int.MAX_VALUE): String? {
+ val sb = StringBuilder(estimate)
+ return if (readUTF8LineTo(sb, limit)) sb.toString() else null
+}
+
+@Deprecated("Use readUTF8Line or readASCIILine instead", ReplaceWith("readUTF8Line(estimate, limit)"))
+suspend fun ByteReadChannel.readLine(estimate: Int = 16, limit: Int = Int.MAX_VALUE): String? = readUTF8Line(estimate, limit)
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/WriterJob.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/WriterJob.kt
new file mode 100644
index 0000000..00e2f71
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/WriterJob.kt
@@ -0,0 +1,35 @@
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+
+/**
+ * A coroutine job that is writing to a byte channel
+ */
+interface WriterJob : Job {
+ /**
+ * A reference to the channel that this coroutine is writing to
+ */
+ val channel: ByteReadChannel
+}
+
+interface WriterScope : CoroutineScope {
+ val channel: ByteWriteChannel
+}
+
+fun writer(coroutineContext: CoroutineContext,
+ channel: ByteChannel,
+ block: suspend CoroutineScope.() -> Unit): WriterJob {
+ val coroutine = WriterCoroutine(newCoroutineContext(coroutineContext), channel)
+ coroutine.initParentJob(coroutineContext[Job])
+ block.startCoroutine(coroutine, coroutine)
+ return coroutine
+}
+
+fun writer(coroutineContext: CoroutineContext,
+ autoFlush: Boolean = false,
+ block: suspend CoroutineScope.() -> Unit): WriterJob = writer(coroutineContext, ByteChannel(autoFlush), block)
+
+private class WriterCoroutine(ctx: CoroutineContext, channel: ByteChannel)
+ : ByteChannelCoroutine(ctx, channel), WriterScope, WriterJob
+
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/ObjectPool.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/ObjectPool.kt
new file mode 100644
index 0000000..822a1c8
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/ObjectPool.kt
@@ -0,0 +1,144 @@
+package kotlinx.coroutines.experimental.io.internal
+
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicReferenceArray
+
+internal interface ObjectPool<T : Any> {
+ val capacity: Int
+ fun borrow(): T
+ fun recycle(instance: T) // can only recycle what was borrowed before
+ fun dispose()
+}
+
+internal val BUFFER_SIZE = getIOIntProperty("BufferSize", 4096)
+private val BUFFER_POOL_SIZE = getIOIntProperty("BufferPoolSize", 2048)
+private val BUFFER_OBJECT_POOL_SIZE = getIOIntProperty("BufferObjectPoolSize", 1024)
+
+// ------------- standard shared pool objects -------------
+
+internal val BufferPool: ObjectPool<ByteBuffer> =
+ object : ObjectPoolImpl<ByteBuffer>(BUFFER_POOL_SIZE) {
+ override fun produceInstance(): ByteBuffer =
+ ByteBuffer.allocateDirect(BUFFER_SIZE)
+ override fun clearInstance(instance: ByteBuffer): ByteBuffer =
+ instance.also { it.clear() }
+ override fun validateInstance(instance: ByteBuffer) {
+ require(instance.capacity() == BUFFER_SIZE)
+ }
+ }
+
+internal val BufferObjectPool: ObjectPool<ReadWriteBufferState.Initial> =
+ object: ObjectPoolImpl<ReadWriteBufferState.Initial>(BUFFER_OBJECT_POOL_SIZE) {
+ override fun produceInstance() =
+ ReadWriteBufferState.Initial(BufferPool.borrow())
+ override fun disposeInstance(instance: ReadWriteBufferState.Initial) {
+ BufferPool.recycle(instance.backingBuffer)
+ }
+ }
+
+internal val BufferObjectNoPool: ObjectPool<ReadWriteBufferState.Initial> =
+ object : NoPoolImpl<ReadWriteBufferState.Initial>() {
+ override fun borrow(): ReadWriteBufferState.Initial =
+ ReadWriteBufferState.Initial(ByteBuffer.allocateDirect(BUFFER_SIZE))
+ }
+
+// ------------- ObjectPoolImpl -------------
+
+private const val MULTIPLIER = 4
+private const val PROBE_COUNT = 8 // number of attepts to find a slot
+private const val MAGIC = 2654435769.toInt() // fractional part of golden ratio
+private const val MAX_CAPACITY = Int.MAX_VALUE / MULTIPLIER
+
+internal abstract class ObjectPoolImpl<T : Any>(final override val capacity: Int) : ObjectPool<T> {
+ init {
+ require(capacity > 0) { "capacity should be positive but it is $capacity" }
+ require(capacity <= MAX_CAPACITY) { "capacity should be less or equal to $MAX_CAPACITY but it is $capacity"}
+ }
+
+ protected abstract fun produceInstance(): T // factory
+ protected open fun clearInstance(instance: T) = instance // optional cleaning of poped items
+ protected open fun validateInstance(instance: T) {} // optional validation for recycled items
+ protected open fun disposeInstance(instance: T) {} // optional destruction of unpoolable items
+
+ @Volatile
+ private var top: Long = 0L
+
+ // closest power of 2 that is equal or larger than capacity * MULTIPLIER
+ private val maxIndex = Integer.highestOneBit(capacity * MULTIPLIER - 1) * 2
+ private val shift = Integer.numberOfLeadingZeros(maxIndex) + 1 // for hash function
+
+ // zero index is reserved for both
+ private val instances = AtomicReferenceArray<T?>(maxIndex + 1)
+ private val next = IntArray(maxIndex + 1)
+
+ override fun borrow(): T =
+ tryPop()?.let { clearInstance(it) } ?: produceInstance()
+
+ override fun recycle(instance: T) {
+ validateInstance(instance)
+ if (!tryPush(instance)) disposeInstance(instance)
+ }
+
+ override fun dispose() {
+ while (true) {
+ val instance = tryPop() ?: return
+ disposeInstance(instance)
+ }
+ }
+
+ private fun tryPush(instance: T): Boolean {
+ var index = ((System.identityHashCode(instance) * MAGIC) ushr shift) + 1
+ repeat (PROBE_COUNT) {
+ if (instances.compareAndSet(index, null, instance)) {
+ pushTop(index)
+ return true
+ }
+ if (--index == 0) index = maxIndex
+ }
+ return false
+ }
+
+ private fun tryPop(): T? {
+ val index = popTop()
+ return if (index == 0) null else instances.getAndSet(index, null)
+ }
+
+ private fun pushTop(index: Int) {
+ require(index > 0)
+ while (true) { // lock-free loop on top
+ val top = this.top // volatile read
+ val topVersion = (top shr 32 and 0xffffffffL) + 1L
+ val topIndex = (top and 0xffffffffL).toInt()
+ val newTop = topVersion shl 32 or index.toLong()
+ next[index] = topIndex
+ if (Top.compareAndSet(this, top, newTop)) return
+ }
+ }
+
+ private fun popTop(): Int {
+ while (true) { // lock-free loop on top
+ val top = this.top // volatile read
+ if (top == 0L) return 0
+ val newVersion = (top shr 32 and 0xffffffffL) + 1L
+ val topIndex = (top and 0xffffffffL).toInt()
+ if (topIndex == 0) return 0
+ val next = next[topIndex]
+ val newTop = newVersion shl 32 or next.toLong()
+ if (Top.compareAndSet(this, top, newTop)) return topIndex
+ }
+ }
+
+ companion object {
+ // todo: replace with atomicfu, remove companion object
+ private val Top = longUpdater(ObjectPoolImpl<*>::top)
+ }
+}
+
+// ------------- NoPoolImpl -------------
+
+internal abstract class NoPoolImpl<T : Any> : ObjectPool<T> {
+ override val capacity: Int get() = 0
+ override abstract fun borrow(): T
+ override fun recycle(instance: T) {}
+ override fun dispose() {}
+}
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/ReadWriteBufferState.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/ReadWriteBufferState.kt
new file mode 100644
index 0000000..37ab97a
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/ReadWriteBufferState.kt
@@ -0,0 +1,91 @@
+package kotlinx.coroutines.experimental.io.internal
+
+import java.nio.ByteBuffer
+
+// this is MAGICAL constant that is tied to the code ByteBufferChannel (that is how much it needs extra)
+internal const val RESERVED_SIZE = 8
+
+internal val EmptyByteBuffer: ByteBuffer = ByteBuffer.allocate(0)
+internal val EmptyCapacity = RingBufferCapacity(0)
+
+internal sealed class ReadWriteBufferState(
+ @JvmField val backingBuffer: ByteBuffer,
+ @JvmField val capacity: RingBufferCapacity
+) {
+ open val idle: Boolean get() = false
+ open val readBuffer: ByteBuffer get() = error("read buffer is not available in state $this")
+ open val writeBuffer: ByteBuffer get() = error("write buffer is not available in state $this")
+
+ internal open fun startReading(): ReadWriteBufferState = error("Reading is not available in state $this")
+ internal open fun startWriting(): ReadWriteBufferState = error("Writing is not available in state $this")
+ internal open fun stopReading(): ReadWriteBufferState = error("Unable to stop reading in state $this")
+ internal open fun stopWriting(): ReadWriteBufferState = error("Unable to stop writing in state $this")
+
+ object IdleEmpty : ReadWriteBufferState(EmptyByteBuffer, EmptyCapacity) {
+ override val idle: Boolean get() = true
+ override fun toString() = "IDLE(empty)"
+ }
+
+ class Initial(
+ backingBuffer: ByteBuffer,
+ reservedSize: Int = RESERVED_SIZE
+ ) : ReadWriteBufferState(backingBuffer, RingBufferCapacity(backingBuffer.capacity() - reservedSize)) {
+ init {
+ require(backingBuffer.position() == 0)
+ require(backingBuffer.limit() == backingBuffer.capacity())
+ }
+ override val writeBuffer: ByteBuffer = backingBuffer.duplicate() // defensive copy of buffer's state
+ override val readBuffer: ByteBuffer = backingBuffer.duplicate() // must have a separate buffer state here
+ // all other possible states
+ internal val idleState = IdleNonEmpty(this)
+ internal val readingState = Reading(this)
+ internal val writingState = Writing(this)
+ internal val readingWritingState = ReadingWriting(this)
+ // state transitions
+ override fun startReading() = readingState
+ override fun startWriting() = writingState
+ override val idle: Boolean get() = error("Not available for initial state")
+ override fun toString() = "Initial"
+ }
+
+ class IdleNonEmpty internal constructor(
+ val initial: Initial // public here, so can release initial state when idle
+ ) : ReadWriteBufferState(initial.backingBuffer, initial.capacity) {
+ override fun startReading() = initial.readingState
+ override fun startWriting() = initial.writingState
+ override val idle: Boolean get() = true
+ override fun toString() = "IDLE(with buffer)"
+ }
+
+ class Reading internal constructor(
+ private val initial: Initial
+ ) : ReadWriteBufferState(initial.backingBuffer, initial.capacity) {
+ override val readBuffer: ByteBuffer get() = initial.readBuffer
+ override fun startWriting() = initial.readingWritingState
+ override fun stopReading() = initial.idleState
+ override fun toString() = "Reading"
+ }
+
+ class Writing internal constructor(
+ private val initial: Initial
+ ) : ReadWriteBufferState(initial.backingBuffer, initial.capacity) {
+ override val writeBuffer: ByteBuffer get() = initial.writeBuffer
+ override fun startReading() = initial.readingWritingState
+ override fun stopWriting() = initial.idleState
+ override fun toString() = "Writing"
+ }
+
+ class ReadingWriting internal constructor(
+ private val initial: Initial
+ ) : ReadWriteBufferState(initial.backingBuffer, initial.capacity) {
+ override val readBuffer: ByteBuffer get() = initial.readBuffer
+ override val writeBuffer: ByteBuffer get() = initial.writeBuffer
+ override fun stopReading() = initial.writingState
+ override fun stopWriting() = initial.readingState
+ override fun toString() = "Reading+Writing"
+ }
+
+ object Terminated : ReadWriteBufferState(EmptyByteBuffer, EmptyCapacity) {
+ override fun toString() = "Terminated"
+ }
+}
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/RingBufferCapacity.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/RingBufferCapacity.kt
new file mode 100644
index 0000000..529b817
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/RingBufferCapacity.kt
@@ -0,0 +1,109 @@
+package kotlinx.coroutines.experimental.io.internal
+
+internal class RingBufferCapacity(private val totalCapacity: Int) {
+ @Volatile @JvmField
+ var availableForRead = 0
+
+ @Volatile @JvmField
+ var availableForWrite = totalCapacity
+
+ @Volatile @JvmField
+ var pendingToFlush = 0
+
+ // concurrent unsafe!
+ fun resetForWrite() {
+ availableForRead = 0
+ availableForWrite = totalCapacity
+ pendingToFlush = 0
+ }
+
+ fun resetForRead() {
+ availableForRead = totalCapacity
+ availableForWrite = 0
+ pendingToFlush = 0
+ }
+
+ fun tryReadExact(n: Int): Boolean {
+ while (true) {
+ val remaining = availableForRead
+ if (remaining < n) return false
+ if (AvailableForRead.compareAndSet(this, remaining, remaining - n)) return true
+ }
+ }
+
+ fun tryReadAtMost(n: Int): Int {
+ while (true) {
+ val remaining = availableForRead
+ val delta = minOf(n, remaining)
+ if (delta == 0) return 0
+ if (AvailableForRead.compareAndSet(this, remaining, remaining - delta)) return delta
+ }
+ }
+
+ fun tryWriteExact(n: Int): Boolean {
+ while (true) {
+ val remaining = availableForWrite
+ if (remaining < n) return false
+ if (AvailableForWrite.compareAndSet(this, remaining, remaining - n)) return true
+ }
+ }
+
+ fun tryWriteAtMost(n: Int): Int {
+ while (true) {
+ val remaining = availableForWrite
+ val delta = minOf(n, remaining)
+ if (delta == 0) return 0
+ if (AvailableForWrite.compareAndSet(this, remaining, remaining - delta)) return delta
+ }
+ }
+
+ fun completeRead(n: Int) {
+ while (true) {
+ val remaining = availableForWrite
+ val update = remaining + n
+ require(update <= totalCapacity) { "Completed read overflow: $remaining + $n = $update > $totalCapacity" }
+ if (AvailableForWrite.compareAndSet(this, remaining, update)) break
+ }
+ }
+
+ fun completeWrite(n: Int) {
+ while (true) {
+ val pending = pendingToFlush
+ val update = pending + n
+ require(update <= totalCapacity) { "Complete write overflow: $pending + $n > $totalCapacity" }
+ if (PendingToFlush.compareAndSet(this, pending, update)) break
+ }
+ }
+
+ /**
+ * @return true if there are bytes available for read after flush
+ */
+ fun flush(): Boolean {
+ val pending = PendingToFlush.getAndSet(this, 0)
+ while (true) {
+ val remaining = availableForRead
+ val update = remaining + pending
+ if (remaining == update || AvailableForRead.compareAndSet(this, remaining, update)) {
+ return update > 0
+ }
+ }
+ }
+
+ fun tryLockForRelease(): Boolean {
+ while (true) {
+ val remaining = availableForWrite
+ if (pendingToFlush > 0 || availableForRead > 0 || remaining != totalCapacity) return false
+ if (AvailableForWrite.compareAndSet(this, remaining, 0)) return true
+ }
+ }
+
+ fun isEmpty(): Boolean = availableForWrite == totalCapacity
+ fun isFull(): Boolean = availableForWrite == 0
+
+ companion object {
+ // todo: replace with atomicfu, remove companion object
+ private val AvailableForRead = intUpdater(RingBufferCapacity::availableForRead)
+ private val AvailableForWrite = intUpdater(RingBufferCapacity::availableForWrite)
+ private val PendingToFlush = intUpdater(RingBufferCapacity::pendingToFlush)
+ }
+}
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/Strings.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/Strings.kt
new file mode 100644
index 0000000..06161ab
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/Strings.kt
@@ -0,0 +1,108 @@
+package kotlinx.coroutines.experimental.io.internal
+
+import java.nio.*
+import java.nio.charset.*
+
+/**
+ * Decodes all the bytes to ASCII characters until end of buffer applying every character to [consumer]
+ * It stops processing if a non-ascii character encountered and returns `false`
+ * @return `false` if a non-ascii character encountered or `true` if all bytes were processed
+ */
+internal inline fun ByteBuffer.decodeASCII(consumer: (Char) -> Boolean): Boolean {
+ while (hasRemaining()) {
+ val v = get().toInt() and 0xff
+ if (v and 0x80 != 0 || !consumer(v.toChar())) {
+ position(position() - 1)
+ return false
+ }
+ }
+
+ return true
+}
+
+/**
+ * Decodes all the bytes to utf8 applying every character on [consumer] until or consumer return `false`.
+ * If a consumer returned false then a character will be pushed back (including all surrogates will be pushed back as well)
+ * and [decodeUTF8] returns 0
+ * @return number of bytes required to decode incomplete utf8 character or 0 if all bytes were processed
+ * or -1 if consumer rejected loop
+ */
+internal inline fun ByteBuffer.decodeUTF8(consumer: (Char) -> Boolean): Int {
+ var byteCount = 0
+ var value = 0
+ var lastByteCount = 0
+
+ while (hasRemaining()) {
+ val v = get().toInt() and 0xff
+ when {
+ v and 0x80 == 0 -> {
+ if (byteCount != 0) throw MalformedInputException(0)
+ if (!consumer(v.toChar())) {
+ position(position() - 1)
+ return -1
+ }
+ }
+ byteCount == 0 -> {
+ // first unicode byte
+
+ var mask = 0x80
+ value = v
+
+ for (i in 1..6) { // TODO do we support 6 bytes unicode?
+ if (value and mask != 0) {
+ value = value and mask.inv()
+ mask = mask shr 1
+ byteCount++
+ } else {
+ break
+ }
+ }
+
+ lastByteCount = byteCount
+ byteCount--
+
+ if (byteCount > remaining()) {
+ position(position() - 1) // return one byte back
+ return lastByteCount
+ }
+ }
+ else -> {
+ // trailing unicode byte
+ value = (value shl 6) or (v and 0x7f)
+ byteCount--
+
+ if (byteCount == 0) {
+ if (isBmpCodePoint(value)) {
+ if (!consumer(value.toChar())) {
+ position(position() - lastByteCount)
+ return -1
+ }
+ } else if (!isValidCodePoint(value)) {
+ throw IllegalArgumentException("Malformed code-point ${Integer.toHexString(value)} found")
+ } else {
+ if (!consumer(highSurrogate(value).toChar()) ||
+ !consumer(lowSurrogate(value).toChar())) {
+ position(position() - lastByteCount)
+ return -1
+ }
+ }
+
+ value = 0
+ }
+ }
+ }
+ }
+
+ return 0
+}
+
+private const val MaxCodePoint = 0X10ffff
+private const val MinLowSurrogate = 0xdc00
+private const val MinHighSurrogate = 0xd800
+private const val MinSupplementary = 0x10000
+private const val HighSurrogateMagic = MinHighSurrogate - (MinSupplementary ushr 10)
+
+private fun isBmpCodePoint(cp: Int) = cp ushr 16 == 0
+private fun isValidCodePoint(codePoint: Int) = codePoint <= MaxCodePoint
+private fun lowSurrogate(cp: Int) = (cp and 0x3ff) + MinLowSurrogate
+private fun highSurrogate(cp: Int) = (cp ushr 10) + HighSurrogateMagic
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/Utils.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/Utils.kt
new file mode 100644
index 0000000..a5be826
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/Utils.kt
@@ -0,0 +1,26 @@
+package kotlinx.coroutines.experimental.io.internal
+
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
+import java.util.concurrent.atomic.AtomicLongFieldUpdater
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
+import kotlin.reflect.KProperty1
+
+internal fun ByteBuffer.isEmpty() = !hasRemaining()
+
+internal inline fun <reified Owner : Any> longUpdater(p: KProperty1<Owner, Long>): AtomicLongFieldUpdater<Owner> {
+ return AtomicLongFieldUpdater.newUpdater(Owner::class.java, p.name)
+}
+
+internal inline fun <reified Owner : Any> intUpdater(p: KProperty1<Owner, Int>): AtomicIntegerFieldUpdater<Owner> {
+ return AtomicIntegerFieldUpdater.newUpdater(Owner::class.java, p.name)
+}
+
+internal inline fun <reified Owner : Any, reified T> updater(p: KProperty1<Owner, T>): AtomicReferenceFieldUpdater<Owner, T> {
+ return AtomicReferenceFieldUpdater.newUpdater(Owner::class.java, T::class.java, p.name)
+}
+
+internal fun getIOIntProperty(name: String, default: Int): Int =
+ try { System.getProperty("kotlinx.coroutines.io.$name") }
+ catch (e: SecurityException) { null }
+ ?.toIntOrNull() ?: default
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacket.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacket.kt
new file mode 100644
index 0000000..62ce16b
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacket.kt
@@ -0,0 +1,38 @@
+package kotlinx.coroutines.experimental.io.packet
+
+import java.io.*
+import java.nio.*
+import kotlin.experimental.*
+
+interface ByteReadPacket {
+ val remaining: Int
+
+ fun readLazy(dst: ByteArray, offset: Int, length: Int): Int
+ fun readLazy(dst: ByteArray) = readLazy(dst, 0, dst.size)
+ fun readLazy(dst: ByteBuffer): Int
+
+ fun readFully(dst: ByteArray, offset: Int, length: Int)
+ fun readFully(dst: ByteArray) = readFully(dst, 0, dst.size)
+ fun readFully(dst: ByteBuffer): Int
+
+ fun readLong(): Long
+ fun readInt(): Int
+ fun readShort(): Short
+ fun readByte(): Byte
+
+ fun readUInt(): Long = readInt().toLong() and 0xffffffff
+ fun readUShort(): Int = readShort().toInt() and 0xffff
+ fun readUByte(): Short = readByte().toShort() and 0xff
+
+ fun readDouble(): Double
+ fun readFloat(): Float
+
+ fun skip(n: Int): Int
+ fun skipExact(n: Int)
+
+ fun release()
+
+ fun readUTF8LineTo(out: Appendable, limit: Int = Int.MAX_VALUE): Boolean
+
+ fun inputStream(): InputStream
+}
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketEmpty.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketEmpty.kt
new file mode 100644
index 0000000..c278890
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketEmpty.kt
@@ -0,0 +1,80 @@
+package kotlinx.coroutines.experimental.io.packet
+
+import java.io.*
+import java.nio.*
+
+object ByteReadPacketEmpty : ByteReadPacket {
+ override val remaining: Int
+ get() = 0
+
+ override fun readLazy(dst: ByteArray, offset: Int, length: Int) = -1
+ override fun readLazy(dst: ByteBuffer): Int = -1
+
+ override fun readFully(dst: ByteArray, offset: Int, length: Int) {
+ if (length > 0) throw EOFException("Couldn't read $length bytes from empty packet")
+ }
+
+ override fun readFully(dst: ByteBuffer): Int {
+ if (dst.hasRemaining()) throw EOFException("Couldn't read ${dst.remaining()} bytes from empty packet")
+ return 0
+ }
+
+ override fun readLong(): Long {
+ throw EOFException("Couldn't read long from empty packet")
+ }
+
+ override fun readInt(): Int {
+ throw EOFException("Couldn't read int from empty packet")
+ }
+
+ override fun readShort(): Short {
+ throw EOFException("Couldn't read short from empty packet")
+ }
+
+ override fun readByte(): Byte {
+ throw EOFException("Couldn't read byte from empty packet")
+ }
+
+ override fun readDouble(): Double {
+ throw EOFException("Couldn't read double from empty packet")
+ }
+
+ override fun readFloat(): Float {
+ throw EOFException("Couldn't read float from empty packet")
+ }
+
+ override fun skip(n: Int): Int {
+ return 0
+ }
+
+ override fun skipExact(n: Int) {
+ if (n != 0) throw EOFException("Couldn't skip $n bytes in empty packet")
+ }
+
+ override fun release() {
+ }
+
+ override fun readUTF8LineTo(out: Appendable, limit: Int) = false
+
+ override fun inputStream() = EmptyInputStream
+
+ private val EmptyInputStream = object : InputStream() {
+ override fun available() = 0
+
+ override fun read(): Int = -1
+ override fun read(b: ByteArray?, off: Int, len: Int): Int = -1
+ override fun read(b: ByteArray?) = -1
+
+ override fun skip(n: Long) = 0L
+
+ override fun markSupported() = true
+ override fun mark(readlimit: Int) {
+ }
+
+ override fun reset() {
+ }
+
+ override fun close() {
+ }
+ }
+}
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketImpl.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketImpl.kt
new file mode 100644
index 0000000..dd9fad2
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketImpl.kt
@@ -0,0 +1,240 @@
+@file:Suppress("UsePropertyAccessSyntax")
+
+package kotlinx.coroutines.experimental.io.packet
+
+import kotlinx.coroutines.experimental.io.internal.*
+import java.io.*
+import java.nio.*
+import java.nio.charset.*
+import java.util.*
+
+internal class ByteReadPacketImpl(private val packets: ArrayDeque<ByteBuffer>, internal val pool: ObjectPool<ByteBuffer>) : ByteReadPacket {
+ override val remaining: Int
+ get() = packets.sumBy { it.remaining() }
+
+ internal fun steal(): ByteBuffer = packets.pollFirst() ?: throw IllegalStateException("EOF")
+
+ override fun readLazy(dst: ByteArray, offset: Int, length: Int): Int {
+ var copied = 0
+
+ val rc = reading(0) { buffer ->
+ val size = minOf(buffer.remaining(), length - copied)
+ buffer.get(dst, offset + copied, size)
+ copied += size
+
+ copied < length
+ }
+
+ return if (rc) copied else -1
+ }
+
+ override fun readLazy(dst: ByteBuffer): Int {
+ var copied = 0
+
+ val rc = reading(0) { buffer ->
+ if (dst.remaining() >= buffer.remaining()) {
+ copied += buffer.remaining()
+ dst.put(buffer)
+ } else {
+ while (dst.hasRemaining() && buffer.hasRemaining()) {
+ dst.put(buffer.get())
+ copied++
+ }
+ }
+
+ dst.hasRemaining()
+ }
+
+ return if (rc) copied else -1
+ }
+
+ override fun readFully(dst: ByteArray, offset: Int, length: Int) {
+ val rc = readLazy(dst, offset, length)
+ if (rc < length) throw EOFException("Not enough bytes in the packet")
+ }
+
+ override fun readFully(dst: ByteBuffer): Int {
+ val rc = readLazy(dst)
+ if (dst.hasRemaining()) throw EOFException("Not enough bytes in the packet")
+ return rc
+ }
+
+ override fun readLong(): Long {
+ var v = 0L
+ val rc = reading(8) { v = it.getLong(); false }
+ if (!rc) throw EOFException("Couldn't read long from empty packet")
+ return v
+ }
+
+ override fun readInt(): Int {
+ var v = 0
+ val rc = reading(4) { v = it.getInt(); false }
+ if (!rc) throw EOFException("Couldn't read int from empty packet")
+ return v
+ }
+
+ override fun readShort(): Short {
+ var v: Short = 0
+ val rc = reading(2) { v = it.getShort(); false }
+ if (!rc) throw EOFException("Couldn't read short from empty packet")
+ return v
+ }
+
+ override fun readByte(): Byte {
+ var v: Byte = 0
+ val rc = reading(1) { v = it.get(); false }
+ if (!rc) throw EOFException("Couldn't read byte from empty packet")
+ return v
+ }
+
+ override fun readDouble(): Double {
+ var v = 0.0
+ val rc = reading(8) { v = it.getDouble(); false }
+ if (!rc) throw EOFException("Couldn't read double from empty packet")
+ return v
+ }
+
+ override fun readFloat(): Float {
+ var v = 0.0f
+ val rc = reading(4) { v = it.getFloat(); false }
+ if (!rc) throw EOFException("Couldn't read float from empty packet")
+ return v
+ }
+
+ override fun readUTF8LineTo(out: Appendable, limit: Int): Boolean {
+ var decoded = 0
+ var size = 1
+ var cr = false
+ var end = false
+
+ val rc = reading(size) { bb ->
+ size = bb.decodeUTF8 { ch ->
+ when (ch) {
+ '\r' -> {
+ if (cr) {
+ end = true
+ return@decodeUTF8 false
+ }
+ cr = true
+ true
+ }
+ '\n' -> {
+ return true
+ }
+ else -> {
+ if (cr) {
+ end = true
+ return@decodeUTF8 false
+ }
+
+ if (decoded == limit) throw BufferOverflowException()
+ decoded++
+ out.append(ch)
+ true
+ }
+ }
+ }
+
+ !end && size == 0
+ }
+
+ if (!rc && size != 0) throw MalformedInputException(0)
+
+ return rc
+ }
+
+ override fun skip(n: Int): Int {
+ var skipped = 0
+
+ reading(0) {
+ val m = minOf(n - skipped, it.remaining())
+ it.position(it.position() + m)
+ skipped += m
+
+ skipped < n
+ }
+
+ return skipped
+ }
+
+ override fun skipExact(n: Int) {
+ if (skip(n) != n) throw EOFException("Unable to skip $n bytes due to end of packet")
+ }
+
+ override fun inputStream(): InputStream {
+ return object : InputStream() {
+ override fun read(): Int {
+ var v: Byte = 0
+ val rc = reading(1) { v = it.get(); true }
+ return if (rc) v.toInt() and 0xff else -1
+ }
+
+ override fun read(b: ByteArray, off: Int, len: Int) = readLazy(b, off, len)
+ override fun skip(n: Long): Long {
+ if (n > Int.MAX_VALUE) return this@ByteReadPacketImpl.skip(Int.MAX_VALUE).toLong()
+ return this@ByteReadPacketImpl.skip(n.toInt()).toLong()
+ }
+
+ override fun available() = remaining
+ }
+ }
+
+ override fun release() {
+ while (packets.isNotEmpty()) {
+ recycle(packets.remove())
+ }
+ }
+
+ private inline fun reading(size: Int, block: (ByteBuffer) -> Boolean): Boolean {
+ if (packets.isEmpty()) return false
+
+ var visited = false
+ var buffer = packets.peekFirst()
+ var stop = false
+
+ while (!stop) {
+ if (buffer.hasRemaining()) {
+ if (buffer.remaining() < size) {
+ if (!tryStealBytesFromNextBuffer(size, buffer)) return false
+ }
+
+ visited = true
+ stop = !block(buffer)
+ }
+
+ if (!buffer.hasRemaining()) {
+ packets.removeFirst()
+ recycle(buffer)
+
+ if (packets.isEmpty()) break
+ buffer = packets.peekFirst()
+ }
+ }
+
+ return visited
+ }
+
+ private fun tryStealBytesFromNextBuffer(size: Int, buffer: ByteBuffer): Boolean {
+ if (packets.size == 1) {
+ return false
+ }
+
+ packets.removeFirst()
+
+ val extraBytes = size - buffer.remaining()
+ val next = packets.peekFirst()
+
+ buffer.compact()
+ repeat(extraBytes) {
+ buffer.put(next.get())
+ }
+ buffer.flip()
+
+ packets.addFirst(buffer)
+ return true
+ }
+
+ private fun recycle(buffer: ByteBuffer) {
+ pool.recycle(buffer)
+ }
+}
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketSingle.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketSingle.kt
new file mode 100644
index 0000000..5b4c28b
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketSingle.kt
@@ -0,0 +1,200 @@
+package kotlinx.coroutines.experimental.io.packet
+
+import kotlinx.coroutines.experimental.io.internal.*
+import java.io.*
+import java.nio.*
+import java.nio.charset.*
+
+internal class ByteReadPacketSingle(private var buffer: ByteBuffer?, internal val pool: ObjectPool<ByteBuffer>) : ByteReadPacket {
+ override val remaining: Int
+ get() = buffer?.remaining() ?: 0
+
+ internal fun steal(): ByteBuffer = buffer?.also { buffer = null } ?: throw IllegalStateException("EOF")
+
+ override fun readLazy(dst: ByteArray, offset: Int, length: Int): Int {
+ var copied = 0
+
+ val rc = reading { buffer ->
+ val size = minOf(buffer.remaining(), length - copied)
+ buffer.get(dst, offset, size)
+ copied += size
+
+ if (copied == length) return copied
+ }
+
+ return if (rc) copied else -1
+ }
+
+ override fun readLazy(dst: ByteBuffer): Int {
+ var copied = 0
+
+ val rc = reading { buffer ->
+ if (dst.remaining() >= buffer.remaining()) {
+ copied += buffer.remaining()
+ dst.put(buffer)
+ } else {
+ while (dst.hasRemaining() && buffer.hasRemaining()) {
+ dst.put(buffer.get())
+ copied++
+ }
+ }
+
+ if (!dst.hasRemaining()) return copied
+ }
+
+ return if (rc) copied else -1
+ }
+
+ override fun readFully(dst: ByteArray, offset: Int, length: Int) {
+ val rc = readLazy(dst, offset, length)
+ if (rc < length) throw EOFException("Not enough bytes in the packet")
+ }
+
+ override fun readFully(dst: ByteBuffer): Int {
+ val rc = readLazy(dst)
+ if (dst.hasRemaining()) throw EOFException("Not enough bytes in the packet")
+ return rc
+ }
+
+ override fun readLong(): Long {
+ var v = 0L
+ val rc = reading { v = it.getLong() }
+ if (!rc) throw EOFException("Couldn't read long from empty packet")
+ return v
+ }
+
+ override fun readInt(): Int {
+ var v = 0
+ val rc = reading { v = it.getInt() }
+ if (!rc) throw EOFException("Couldn't read int from empty packet")
+ return v
+ }
+
+ override fun readShort(): Short {
+ var v: Short = 0
+ val rc = reading { v = it.getShort() }
+ if (!rc) throw EOFException("Couldn't read short from empty packet")
+ return v
+ }
+
+ override fun readByte(): Byte {
+ var v: Byte = 0
+ val rc = reading { v = it.get() }
+ if (!rc) throw EOFException("Couldn't read byte from empty packet")
+ return v
+ }
+
+ override fun readDouble(): Double {
+ var v = 0.0
+ val rc = reading { v = it.getDouble() }
+ if (!rc) throw EOFException("Couldn't read double from empty packet")
+ return v
+ }
+
+ override fun readFloat(): Float {
+ var v = 0.0f
+ val rc = reading { v = it.getFloat() }
+ if (!rc) throw EOFException("Couldn't read float from empty packet")
+ return v
+ }
+
+ override fun readUTF8LineTo(out: Appendable, limit: Int): Boolean {
+ var decoded = 0
+ var cr = false
+
+ return reading { bb ->
+ val rc = bb.decodeUTF8 { ch ->
+ when (ch) {
+ '\r' -> {
+ if (cr) {
+ false
+ } else {
+ cr = true
+ true
+ }
+ }
+ '\n' -> false
+ else -> {
+ if (cr) {
+ false
+ } else {
+ if (decoded == limit) throw BufferOverflowException()
+ decoded++
+ out.append(ch)
+ true
+ }
+ }
+ }
+ }
+
+ if (rc == -1) {
+ val v = bb.get()
+ if (v != 0x0a.toByte() && v != 0x0d.toByte()) {
+ bb.position(bb.position() - 1)
+ }
+ } else if (rc > 0) throw MalformedInputException(0)
+ }
+ }
+
+ override fun skip(n: Int): Int {
+ var skipped = 0
+
+ reading {
+ val m = minOf(n - skipped, it.remaining())
+ it.position(it.position() + m)
+ skipped += m
+ }
+
+ return skipped
+ }
+
+ override fun skipExact(n: Int) {
+ if (skip(n) != n) throw EOFException("Unable to skip $n bytes due to end of packet")
+ }
+
+ override fun inputStream(): InputStream {
+ return object : InputStream() {
+ override fun read(): Int {
+ var v: Byte = 0
+ val rc = reading { v = it.get() }
+ return if (rc) v.toInt() and 0xff else -1
+ }
+
+ override fun read(b: ByteArray, off: Int, len: Int) = readLazy(b, off, len)
+ override fun skip(n: Long): Long {
+ if (n > Int.MAX_VALUE) return this@ByteReadPacketSingle.skip(Int.MAX_VALUE).toLong()
+ return this@ByteReadPacketSingle.skip(n.toInt()).toLong()
+ }
+
+ override fun available() = remaining
+ }
+ }
+
+ override fun release() {
+ recycle(buffer ?: return)
+ buffer = null
+ }
+
+ private inline fun reading(block: (ByteBuffer) -> Unit): Boolean {
+ val buffer = buffer ?: return false
+
+ if (!buffer.hasRemaining()) {
+ this.buffer = null
+ recycle(buffer)
+ return false
+ }
+
+ block(buffer)
+
+ if (!buffer.hasRemaining()) {
+ this.buffer = null
+ recycle(buffer)
+ }
+
+ return true
+ }
+
+ private fun recycle(buffer: ByteBuffer) {
+ pool.recycle(buffer)
+ }
+}
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteWritePacket.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteWritePacket.kt
new file mode 100644
index 0000000..564be91
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteWritePacket.kt
@@ -0,0 +1,31 @@
+package kotlinx.coroutines.experimental.io.packet
+
+import java.io.*
+import java.nio.*
+
+interface ByteWritePacket : Appendable {
+ fun writeFully(src: ByteArray, offset: Int, length: Int)
+ fun writeFully(src: ByteArray) = writeFully(src, 0, src.size)
+ fun writeFully(src: ByteBuffer)
+
+ fun writeLong(l: Long)
+ fun writeInt(i: Int)
+ fun writeShort(s: Short)
+ fun writeByte(b: Byte)
+ fun writeDouble(d: Double)
+ fun writeFloat(f: Float)
+
+ fun writeStringUtf8(s: String)
+ fun writeStringUtf8(cb: CharBuffer)
+ fun writeStringUtf8(cs: CharSequence)
+
+ fun release()
+ fun build(): ByteReadPacket
+
+ fun outputStream(): OutputStream
+
+ override fun append(csq: CharSequence): ByteWritePacket {
+ append(csq, 0, csq.length)
+ return this
+ }
+}
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteWritePacketImpl.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteWritePacketImpl.kt
new file mode 100644
index 0000000..562c27a
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteWritePacketImpl.kt
@@ -0,0 +1,208 @@
+package kotlinx.coroutines.experimental.io.packet
+
+import kotlinx.coroutines.experimental.io.internal.*
+import java.io.*
+import java.nio.*
+import java.util.*
+
+internal class ByteWritePacketImpl(private val pool: ObjectPool<ByteBuffer>) : ByteWritePacket {
+ private var buffers: Any? = null
+
+ override fun writeFully(src: ByteArray, offset: Int, length: Int) {
+ var copied = 0
+
+ while (copied < length) {
+ write(1) { buffer ->
+ val size = minOf(buffer.remaining(), length - copied)
+ buffer.put(src, offset + copied, size)
+ copied += size
+ }
+ }
+ }
+
+ override fun writeFully(src: ByteBuffer) {
+ while (src.hasRemaining()) {
+ write(1) { buffer ->
+ if (buffer.remaining() >= src.remaining()) {
+ buffer.put(src)
+ } else {
+ while (buffer.hasRemaining() && src.hasRemaining()) {
+ buffer.put(src.get())
+ }
+ }
+ }
+ }
+ }
+
+ override fun writeLong(l: Long) {
+ write(8) { it.putLong(l) }
+ }
+
+ override fun writeInt(i: Int) {
+ write(4) { it.putInt(i) }
+ }
+
+ override fun writeShort(s: Short) {
+ write(2) { it.putShort(s) }
+ }
+
+ override fun writeByte(b: Byte) {
+ write(1) { it.put(b) }
+ }
+
+ override fun writeDouble(d: Double) {
+ write(8) { it.putDouble(d) }
+ }
+
+ override fun writeFloat(f: Float) {
+ write(4) { it.putFloat(f) }
+ }
+
+ override fun append(c: Char): ByteWritePacket {
+ write(3) {
+ it.putUtf8Char(c.toInt() and 0xffff)
+ }
+ return this
+ }
+
+ override fun append(csq: CharSequence, start: Int, end: Int): ByteWritePacket {
+ val length = end - start
+ var w = 0
+ var requiredSize = 1
+
+ while (w < length) {
+ write(requiredSize) { bb ->
+ while (w < length) {
+ val ch = csq[start + w]
+ val v = ch.toInt() and 0xffff
+
+ requiredSize = when {
+ v in 1..0x7f -> 1
+ v > 0x7ff -> 3
+ else -> 2
+ }
+
+ if (bb.remaining() >= requiredSize) {
+ bb.putUtf8Char(v)
+ w++
+ } else {
+ break
+ }
+ }
+ }
+ }
+
+ return this
+ }
+
+ override fun writeStringUtf8(s: String) {
+ append(s, 0, s.length)
+ }
+
+ override fun writeStringUtf8(cs: CharSequence) {
+ append(cs, 0, cs.length)
+ }
+
+ override fun writeStringUtf8(cb: CharBuffer) {
+ append(cb, 0, cb.remaining())
+ }
+
+ @Suppress("NOTHING_TO_INLINE")
+ private inline fun ByteBuffer.putUtf8Char(v: Int) {
+ when {
+ v in 1..0x7f -> put(v.toByte())
+ v > 0x7ff -> {
+ put((0xe0 or ((v shr 12) and 0x0f)).toByte())
+ put((0x80 or ((v shr 6) and 0x3f)).toByte())
+ put((0x80 or ( v and 0x3f)).toByte())
+ }
+ else -> {
+ put((0xc0 or ((v shr 6) and 0x1f)).toByte())
+ put((0x80 or ( v and 0x3f)).toByte())
+ }
+ }
+ }
+
+ override fun outputStream(): OutputStream {
+ return object : OutputStream() {
+ override fun write(b: Int) {
+ writeByte(b.toByte())
+ }
+
+ override fun write(b: ByteArray, off: Int, len: Int) {
+ writeFully(b, off, len)
+ }
+ }
+ }
+
+ override fun build(): ByteReadPacket {
+ val bs = buffers ?: return ByteReadPacketEmpty
+ buffers = null
+
+ return if (bs is ArrayDeque<*>) {
+ @Suppress("UNCHECKED_CAST")
+ when {
+ bs.isEmpty() -> ByteReadPacketEmpty
+ bs.size == 1 -> ByteReadPacketSingle((bs.first as ByteBuffer).also { it.flip() }, pool)
+ else -> ByteReadPacketImpl((bs as ArrayDeque<ByteBuffer>).also {
+ for (b in bs) {
+ b.flip()
+ }
+ }, pool)
+ }
+ } else {
+ ByteReadPacketSingle((bs as ByteBuffer).also { it.flip() }, pool)
+ }
+ }
+
+ override fun release() {
+ val bs = buffers ?: return
+ buffers = null
+
+ if (bs is ArrayDeque<*>) {
+ for (o in bs) {
+ recycle(o as ByteBuffer)
+ }
+ } else {
+ recycle(bs as ByteBuffer)
+ }
+ }
+
+ private inline fun write(size: Int, block: (ByteBuffer) -> Unit) {
+ val buffer = last()?.takeIf { it.remaining() >= size }
+
+ if (buffer == null) {
+ val new = pool.borrow()
+ last(new)
+ new.clear()
+ block(new)
+ } else {
+ block(buffer)
+ }
+ }
+
+ private fun last(): ByteBuffer? = buffers?.let { b ->
+ @Suppress("UNCHECKED_CAST")
+ when (b) {
+ is ByteBuffer -> b
+ is ArrayDeque<*> -> (b as ArrayDeque<ByteBuffer>).takeIf { it.isNotEmpty() }?.peekLast()
+ else -> null
+ }
+ }
+
+ private fun last(new: ByteBuffer) {
+ @Suppress("UNCHECKED_CAST")
+ if (buffers is ArrayDeque<*>) (buffers as ArrayDeque<ByteBuffer>).addLast(new)
+ else if (buffers == null) buffers = new
+ else {
+ val dq = ArrayDeque<ByteBuffer>()
+ dq.addFirst(buffers as ByteBuffer)
+ dq.addLast(new)
+ buffers = dq
+ }
+ }
+
+ private fun recycle(buffer: ByteBuffer) {
+ pool.recycle(buffer)
+ }
+}
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/Packets.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/Packets.kt
new file mode 100644
index 0000000..26ebba4
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/Packets.kt
@@ -0,0 +1,22 @@
+package kotlinx.coroutines.experimental.io.packet
+
+import kotlinx.coroutines.experimental.io.internal.ObjectPool
+import kotlinx.coroutines.experimental.io.internal.ObjectPoolImpl
+import kotlinx.coroutines.experimental.io.internal.getIOIntProperty
+import java.nio.ByteBuffer
+
+private val PACKET_BUFFER_SIZE = getIOIntProperty("PacketBufferSize", 4096)
+private val PACKET_BUFFER_POOL_SIZE = getIOIntProperty("PacketBufferPoolSize", 128)
+
+private val PacketBufferPool: ObjectPool<ByteBuffer> =
+ object : ObjectPoolImpl<ByteBuffer>(PACKET_BUFFER_POOL_SIZE) {
+ override fun produceInstance(): ByteBuffer = ByteBuffer.allocateDirect(PACKET_BUFFER_SIZE)
+ }
+
+fun buildPacket(block: ByteWritePacket.() -> Unit): ByteReadPacket =
+ ByteWritePacketImpl(PacketBufferPool).apply(block).build()
+
+fun ByteReadPacket.readUTF8Line(estimate: Int = 16, limit: Int = Int.MAX_VALUE): String? {
+ val sb = StringBuilder(estimate)
+ return if (readUTF8LineTo(sb, limit)) sb.toString() else null
+}
diff --git a/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelScenarioTest.kt b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelScenarioTest.kt
new file mode 100644
index 0000000..6c7021e
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelScenarioTest.kt
@@ -0,0 +1,219 @@
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.*
+import org.junit.*
+import java.io.*
+import kotlin.test.*
+
+class ByteBufferChannelScenarioTest : TestBase() {
+ private val ch = ByteBufferChannel(true)
+
+ @Test
+ fun testReadBeforeAvailable() {
+ expect(1)
+
+ runBlocking {
+ launch(coroutineContext) {
+ expect(3)
+
+ val bb = ByteBuffer.allocate(10)
+ val rc = ch.readAvailable(bb) // should suspend
+
+ expect(5)
+ assertEquals(4, rc)
+
+ expect(6)
+ }
+
+ expect(2)
+ yield()
+
+ expect(4)
+ ch.writeInt(0xff) // should resume
+
+ yield()
+
+ finish(7)
+ }
+ }
+
+ @Test
+ fun testReadBeforeAvailable2() {
+ expect(1)
+
+ runBlocking {
+ launch(coroutineContext) {
+ expect(3)
+
+ val bb = ByteBuffer.allocate(4)
+ ch.readFully(bb) // should suspend
+
+ expect(5)
+
+ bb.flip()
+ assertEquals(4, bb.remaining())
+
+ expect(6)
+ }
+
+ expect(2)
+ yield()
+
+ expect(4)
+ ch.writeInt(0xff) // should resume
+
+ yield()
+
+ finish(7)
+ }
+ }
+
+ @Test
+ fun testReadAfterAvailable() {
+ expect(1)
+
+ runBlocking {
+ ch.writeInt(0xff) // should resume
+
+ launch(coroutineContext) {
+ expect(3)
+
+ val bb = ByteBuffer.allocate(10)
+ val rc = ch.readAvailable(bb) // should NOT suspend
+
+ expect(4)
+ assertEquals(4, rc)
+
+ expect(5)
+ }
+
+ expect(2)
+ yield()
+
+ finish(6)
+ }
+ }
+
+ @Test
+ fun testReadAfterAvailable2() {
+ expect(1)
+
+ runBlocking {
+ ch.writeInt(0xff) // should resume
+
+ launch(coroutineContext) {
+ expect(3)
+
+ val bb = ByteBuffer.allocate(4)
+ ch.readFully(bb) // should NOT suspend
+
+ expect(4)
+ bb.flip()
+ assertEquals(4, bb.remaining())
+
+ expect(5)
+ }
+
+ expect(2)
+ yield()
+
+ finish(6)
+ }
+ }
+
+ @Test
+ fun testReadToEmpty() {
+ runBlocking {
+ expect(1)
+
+ val rc = ch.readAvailable(ByteBuffer.allocate(0))
+
+ expect(2)
+
+ assertEquals(0, rc)
+
+ finish(3)
+ }
+ }
+
+ @Test
+ fun testReadToEmptyFromFailedChannel() {
+ runBlocking {
+ expect(1)
+
+ ch.close(IOException())
+
+ try {
+ ch.readAvailable(ByteBuffer.allocate(0))
+ fail("Should throw exception")
+ } catch (expected: IOException) {
+ }
+
+ finish(2)
+ }
+ }
+
+ @Test
+ fun testReadToEmptyFromClosedChannel() {
+ runBlocking {
+ expect(1)
+
+ ch.close()
+
+ val rc = ch.readAvailable(ByteBuffer.allocate(0))
+
+ expect(2)
+
+ assertEquals(-1, rc)
+
+ finish(3)
+ }
+ }
+
+ @Test
+ fun testReadFullyToEmptyFromClosedChannel() {
+ runBlocking {
+ expect(1)
+
+ ch.close()
+
+ ch.readFully(ByteBuffer.allocate(0))
+
+ finish(2)
+ }
+ }
+
+ @Test
+ fun testReadFullyFromClosedChannel() {
+ runBlocking {
+ expect(1)
+
+ ch.close()
+ try {
+ ch.readFully(ByteBuffer.allocate(1))
+ fail("Should throw exception")
+ } catch (expected: ClosedReceiveChannelException) {
+ }
+
+ finish(2)
+ }
+ }
+
+ @Test
+ fun testReadFullyToEmptyFromFailedChannel() {
+ runBlocking {
+ expect(1)
+
+ ch.close(IOException())
+
+ try {
+ ch.readFully(ByteBuffer.allocate(0))
+ fail("Should throw exception")
+ } catch (expected: IOException) {
+ }
+
+ finish(2)
+ }
+ }
+}
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelTest.kt b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelTest.kt
new file mode 100644
index 0000000..00ab358
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelTest.kt
@@ -0,0 +1,604 @@
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.*
+import kotlinx.coroutines.experimental.io.internal.*
+import kotlinx.coroutines.experimental.io.packet.*
+import org.junit.*
+import org.junit.rules.*
+import java.nio.*
+import java.nio.ByteBuffer
+import java.util.*
+import java.util.concurrent.*
+import kotlin.test.*
+
+class ByteBufferChannelTest {
+ @get:Rule
+ val timeout = Timeout(10, TimeUnit.SECONDS)
+
+ @get:Rule
+ val failures = ErrorCollector()
+
+ private val Size = BUFFER_SIZE - RESERVED_SIZE
+ private val ch = ByteBufferChannel(autoFlush = false, pool = BufferObjectNoPool)
+
+ @Test
+ fun testBoolean() {
+ runBlocking {
+ ch.writeBoolean(true)
+ ch.flush()
+ assertEquals(true, ch.readBoolean())
+
+ ch.writeBoolean(false)
+ ch.flush()
+ assertEquals(false, ch.readBoolean())
+ }
+ }
+
+ @Test
+ fun testByte() {
+ runBlocking {
+ assertEquals(0, ch.availableForRead)
+ ch.writeByte(-1)
+ ch.flush()
+ assertEquals(1, ch.availableForRead)
+ assertEquals(-1, ch.readByte())
+ assertEquals(0, ch.availableForRead)
+ }
+ }
+
+ @Test
+ fun testShortB() {
+ runBlocking {
+ ch.readByteOrder = ByteOrder.BIG_ENDIAN
+ ch.writeByteOrder = ByteOrder.BIG_ENDIAN
+
+ assertEquals(0, ch.availableForRead)
+ ch.writeShort(-1)
+ assertEquals(0, ch.availableForRead)
+ ch.flush()
+ assertEquals(2, ch.availableForRead)
+ assertEquals(-1, ch.readShort())
+ assertEquals(0, ch.availableForRead)
+ }
+ }
+
+ @Test
+ fun testShortL() {
+ runBlocking {
+ ch.readByteOrder = ByteOrder.LITTLE_ENDIAN
+ ch.writeByteOrder = ByteOrder.LITTLE_ENDIAN
+
+ assertEquals(0, ch.availableForRead)
+ ch.writeShort(-1)
+ assertEquals(0, ch.availableForRead)
+ ch.flush()
+ assertEquals(2, ch.availableForRead)
+ assertEquals(-1, ch.readShort())
+ assertEquals(0, ch.availableForRead)
+ }
+ }
+
+ @Test
+ fun testShortEdge() {
+ runBlocking {
+ ch.writeByte(1)
+
+ for (i in 2 until Size step 2) {
+ ch.writeShort(0x00ee)
+ }
+
+ ch.flush()
+
+ ch.readByte()
+ ch.writeShort(0x1234)
+
+ ch.flush()
+
+ while (ch.availableForRead > 2) {
+ ch.readShort()
+ }
+
+ assertEquals(0x1234, ch.readShort())
+ }
+ }
+
+ @Test
+ fun testIntB() {
+ runBlocking {
+ ch.readByteOrder = ByteOrder.BIG_ENDIAN
+ ch.writeByteOrder = ByteOrder.BIG_ENDIAN
+
+ assertEquals(0, ch.availableForRead)
+ ch.writeInt(-1)
+ ch.flush()
+ assertEquals(4, ch.availableForRead)
+ assertEquals(-1, ch.readInt())
+ assertEquals(0, ch.availableForRead)
+ }
+ }
+
+ @Test
+ fun testIntL() {
+ runBlocking {
+ ch.readByteOrder = ByteOrder.LITTLE_ENDIAN
+ ch.writeByteOrder = ByteOrder.LITTLE_ENDIAN
+
+ assertEquals(0, ch.availableForRead)
+ ch.writeInt(-1)
+ ch.flush()
+ assertEquals(4, ch.availableForRead)
+ assertEquals(-1, ch.readInt())
+ assertEquals(0, ch.availableForRead)
+ }
+ }
+
+ @Test
+ fun testIntEdge() {
+ runBlocking {
+ for (shift in 1..3) {
+ for (i in 1..shift) {
+ ch.writeByte(1)
+ }
+
+ repeat(Size / 4 - 1) {
+ ch.writeInt(0xeeeeeeeeL)
+ }
+
+ ch.flush()
+
+ for (i in 1..shift) {
+ ch.readByte()
+ }
+
+ ch.writeInt(0x12345678)
+
+ ch.flush()
+
+ while (ch.availableForRead > 4) {
+ ch.readInt()
+ }
+
+ assertEquals(0x12345678, ch.readInt())
+ }
+ }
+ }
+
+ @Test
+ fun testLongB() {
+ runBlocking {
+ ch.readByteOrder = ByteOrder.BIG_ENDIAN
+ ch.writeByteOrder = ByteOrder.BIG_ENDIAN
+
+ assertEquals(0, ch.availableForRead)
+ ch.writeLong(Long.MIN_VALUE)
+ ch.flush()
+ assertEquals(8, ch.availableForRead)
+ assertEquals(Long.MIN_VALUE, ch.readLong())
+ assertEquals(0, ch.availableForRead)
+ }
+ }
+
+ @Test
+ fun testLongL() {
+ runBlocking {
+ ch.readByteOrder = ByteOrder.LITTLE_ENDIAN
+ ch.writeByteOrder = ByteOrder.LITTLE_ENDIAN
+
+ assertEquals(0, ch.availableForRead)
+ ch.writeLong(Long.MIN_VALUE)
+ ch.flush()
+ assertEquals(8, ch.availableForRead)
+ assertEquals(Long.MIN_VALUE, ch.readLong())
+ assertEquals(0, ch.availableForRead)
+ }
+ }
+
+ @Test
+ fun testLongEdge() {
+ runBlocking {
+ for (shift in 1..7) {
+ for (i in 1..shift) {
+ ch.writeByte(1)
+ }
+
+ repeat(Size / 8 - 1) {
+ ch.writeLong(0x11112222eeeeeeeeL)
+ }
+
+ ch.flush()
+ for (i in 1..shift) {
+ ch.readByte()
+ }
+
+ ch.writeLong(0x1234567812345678L)
+ ch.flush()
+
+ while (ch.availableForRead > 8) {
+ ch.readLong()
+ }
+
+ assertEquals(0x1234567812345678L, ch.readLong())
+ }
+ }
+ }
+
+ @Test
+ fun testDoubleB() {
+ runBlocking {
+ ch.readByteOrder = ByteOrder.BIG_ENDIAN
+ ch.writeByteOrder = ByteOrder.BIG_ENDIAN
+
+ assertEquals(0, ch.availableForRead)
+ ch.writeDouble(1.05)
+ ch.flush()
+
+ assertEquals(8, ch.availableForRead)
+ assertEquals(1.05, ch.readDouble())
+ assertEquals(0, ch.availableForRead)
+ }
+ }
+
+ @Test
+ fun testDoubleL() {
+ runBlocking {
+ ch.readByteOrder = ByteOrder.LITTLE_ENDIAN
+ ch.writeByteOrder = ByteOrder.LITTLE_ENDIAN
+
+ assertEquals(0, ch.availableForRead)
+ ch.writeDouble(1.05)
+ ch.flush()
+
+ assertEquals(8, ch.availableForRead)
+ assertEquals(1.05, ch.readDouble())
+ assertEquals(0, ch.availableForRead)
+ }
+ }
+
+ @Test
+ fun testFloatB() {
+ runBlocking {
+ ch.readByteOrder = ByteOrder.BIG_ENDIAN
+ ch.writeByteOrder = ByteOrder.BIG_ENDIAN
+
+ assertEquals(0, ch.availableForRead)
+ ch.writeFloat(1.05f)
+ ch.flush()
+
+ assertEquals(4, ch.availableForRead)
+ assertEquals(1.05f, ch.readFloat())
+ assertEquals(0, ch.availableForRead)
+ }
+ }
+
+ @Test
+ fun testFloatL() {
+ runBlocking {
+ ch.readByteOrder = ByteOrder.LITTLE_ENDIAN
+ ch.writeByteOrder = ByteOrder.LITTLE_ENDIAN
+
+ assertEquals(0, ch.availableForRead)
+ ch.writeFloat(1.05f)
+ ch.flush()
+
+ assertEquals(4, ch.availableForRead)
+ assertEquals(1.05f, ch.readFloat())
+ assertEquals(0, ch.availableForRead)
+ }
+ }
+
+
+
+ @Test
+ fun testEndianMix() {
+ val byteOrders = listOf(ByteOrder.BIG_ENDIAN, ByteOrder.LITTLE_ENDIAN)
+ runBlocking {
+ for (writeOrder in byteOrders) {
+ ch.writeByteOrder = writeOrder
+
+ for (readOrder in byteOrders) {
+ ch.readByteOrder = readOrder
+
+ assertEquals(0, ch.availableForRead)
+ ch.writeShort(0x001f)
+ ch.flush()
+ if (writeOrder == readOrder)
+ assertEquals(0x001f, ch.readShort())
+ else
+ assertEquals(0x1f00, ch.readShort())
+
+ assertEquals(0, ch.availableForRead)
+ ch.writeShort(0x001f)
+ ch.flush()
+ if (writeOrder == readOrder)
+ assertEquals(0x001f, ch.readShort())
+ else
+ assertEquals(0x1f00, ch.readShort())
+
+ assertEquals(0, ch.availableForRead)
+ ch.writeInt(0x1f)
+ ch.flush()
+ if (writeOrder == readOrder)
+ assertEquals(0x0000001f, ch.readInt())
+ else
+ assertEquals(0x1f000000, ch.readInt())
+
+ assertEquals(0, ch.availableForRead)
+ ch.writeInt(0x1fL)
+ ch.flush()
+ if (writeOrder == readOrder)
+ assertEquals(0x0000001f, ch.readInt())
+ else
+ assertEquals(0x1f000000, ch.readInt())
+
+ assertEquals(0, ch.availableForRead)
+ ch.writeLong(0x1f)
+ ch.flush()
+ if (writeOrder == readOrder)
+ assertEquals(0x1f, ch.readLong())
+ else
+ assertEquals(0x1f00000000000000L, ch.readLong())
+ }
+ }
+ }
+ }
+
+ @Test
+ fun testClose() {
+ runBlocking {
+ ch.writeByte(1)
+ ch.writeByte(2)
+ ch.writeByte(3)
+
+ ch.flush()
+ assertEquals(1, ch.readByte())
+ ch.close()
+
+ assertEquals(2, ch.readByte())
+ assertEquals(3, ch.readByte())
+
+ try {
+ ch.readByte()
+ fail()
+ } catch (expected: ClosedReceiveChannelException) {
+ }
+ }
+ }
+
+ @Test
+ fun testReadAndWriteFully() {
+ runBlocking {
+ val bytes = byteArrayOf(1, 2, 3, 4, 5)
+ val dst = ByteArray(5)
+
+ ch.writeFully(bytes)
+ ch.flush()
+ assertEquals(5, ch.availableForRead)
+ ch.readFully(dst)
+ assertTrue { dst.contentEquals(bytes) }
+
+ ch.writeFully(bytes)
+ ch.flush()
+
+ val dst2 = ByteArray(4)
+ ch.readFully(dst2)
+
+ assertEquals(1, ch.availableForRead)
+ assertEquals(5, ch.readByte())
+
+ ch.close()
+
+ try {
+ ch.readFully(dst)
+ fail("")
+ } catch (expected: ClosedReceiveChannelException) {
+ }
+ }
+ }
+
+ @Test
+ fun testReadAndWriteFullyByteBuffer() {
+ runBlocking {
+ val bytes = byteArrayOf(1, 2, 3, 4, 5)
+ val dst = ByteArray(5)
+
+ ch.writeFully(ByteBuffer.wrap(bytes))
+ ch.flush()
+ assertEquals(5, ch.availableForRead)
+ ch.readFully(ByteBuffer.wrap(dst))
+ assertTrue { dst.contentEquals(bytes) }
+
+ ch.writeFully(ByteBuffer.wrap(bytes))
+ ch.flush()
+
+ val dst2 = ByteArray(4)
+ ch.readFully(ByteBuffer.wrap(dst2))
+
+ assertEquals(1, ch.availableForRead)
+ assertEquals(5, ch.readByte())
+
+ ch.close()
+
+ try {
+ ch.readFully(ByteBuffer.wrap(dst))
+ fail("")
+ } catch (expected: ClosedReceiveChannelException) {
+ }
+ }
+ }
+
+ @Test
+ fun testReadAndWritePartially() {
+ runBlocking {
+ val bytes = byteArrayOf(1, 2, 3, 4, 5)
+
+ assertEquals(5, ch.writeAvailable(bytes))
+ ch.flush()
+ assertEquals(5, ch.readAvailable(ByteArray(100)))
+
+ repeat(Size / bytes.size) {
+ assertNotEquals(0, ch.writeAvailable(bytes))
+ ch.flush()
+ }
+
+ ch.readAvailable(ByteArray(ch.availableForRead - 1))
+ assertEquals(1, ch.readAvailable(ByteArray(100)))
+
+ ch.close()
+ }
+ }
+
+ @Test
+ fun testReadAndWritePartiallyByteBuffer() {
+ runBlocking {
+ val bytes = byteArrayOf(1, 2, 3, 4, 5)
+
+ assertEquals(5, ch.writeAvailable(ByteBuffer.wrap(bytes)))
+ ch.flush()
+ assertEquals(5, ch.readAvailable(ByteBuffer.allocate(100)))
+
+ repeat(Size / bytes.size) {
+ assertNotEquals(0, ch.writeAvailable(ByteBuffer.wrap(bytes)))
+ ch.flush()
+ }
+
+ ch.readAvailable(ByteArray(ch.availableForRead - 1))
+ assertEquals(1, ch.readAvailable(ByteBuffer.allocate(100)))
+
+ ch.close()
+ }
+ }
+
+
+ @Test
+ fun testReadAndWriteBig() {
+ val count = 200
+ val bytes = ByteArray(65536)
+ Random().nextBytes(bytes)
+
+ launch(CommonPool + CoroutineName("writer")) {
+ for (i in 1..count) {
+ ch.writeFully(bytes)
+ ch.flush()
+ }
+ }.invokeOnCompletion { t ->
+ if (t != null) {
+ failures.addError(t)
+ }
+ }
+
+ runBlocking(CoroutineName("reader")) {
+ val dst = ByteArray(bytes.size)
+ for (i in 1..count) {
+ ch.readFully(dst)
+ assertTrue { dst.contentEquals(bytes) }
+ dst.fill(0)
+ }
+ }
+ }
+
+ @Test
+ fun testReadAndWriteBigByteBuffer() {
+ val count = 200
+ val bytes = ByteArray(65536)
+ Random().nextBytes(bytes)
+
+ launch(CommonPool + CoroutineName("writer")) {
+ for (i in 1..count) {
+ ch.writeFully(ByteBuffer.wrap(bytes))
+ ch.flush()
+ }
+ }.invokeOnCompletion { t ->
+ if (t != null) {
+ failures.addError(t)
+ }
+ }
+
+ runBlocking(CoroutineName("reader")) {
+ val dst = ByteArray(bytes.size)
+ for (i in 1..count) {
+ ch.readFully(ByteBuffer.wrap(dst))
+ assertTrue { dst.contentEquals(bytes) }
+ dst.fill(0)
+ }
+ }
+ }
+
+ @Test
+ fun testPacket() = runBlocking {
+ val packet = buildPacket {
+ writeInt(0xffee)
+ writeStringUtf8("Hello")
+ }
+
+ ch.writeInt(packet.remaining)
+ ch.writePacket(packet)
+
+ ch.flush()
+
+ val size = ch.readInt()
+ val readed = ch.readPacket(size)
+
+ assertEquals(0xffee, readed.readInt())
+ assertEquals("Hello", readed.readUTF8Line())
+ }
+
+ @Test
+ fun testBigPacket() = runBlocking {
+ launch(CommonPool + CoroutineName("writer")) {
+ val packet = buildPacket {
+ writeInt(0xffee)
+ writeStringUtf8(".".repeat(8192))
+ }
+
+ ch.writeInt(packet.remaining)
+ ch.writePacket(packet)
+
+ ch.flush()
+ }
+
+ val size = ch.readInt()
+ val readed = ch.readPacket(size)
+
+ assertEquals(0xffee, readed.readInt())
+ assertEquals(".".repeat(8192), readed.readUTF8Line())
+ }
+
+ @Test
+ fun testWriteString() = runBlocking {
+ ch.writeStringUtf8("abc")
+ ch.close()
+
+ assertEquals("abc", ch.readASCIILine())
+ }
+
+ @Test
+ fun testWriteCharSequence() = runBlocking {
+ ch.writeStringUtf8("abc" as CharSequence)
+ ch.close()
+
+ assertEquals("abc", ch.readASCIILine())
+ }
+
+ @Test
+ fun testWriteCharBuffer() = runBlocking {
+ val cb = CharBuffer.allocate(6)
+
+ for (i in 0 until cb.remaining()) {
+ cb.put(i, ' ')
+ }
+
+ cb.position(2)
+ cb.put(2, 'a')
+ cb.put(3, 'b')
+ cb.put(4, 'c')
+ cb.limit(5)
+
+ assertEquals("abc", cb.slice().toString())
+
+ ch.writeStringUtf8(cb)
+ ch.close()
+
+ assertEquals("abc", ch.readASCIILine())
+ }
+}
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/BytePacketBuildTest.kt b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/BytePacketBuildTest.kt
new file mode 100644
index 0000000..b7de477
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/BytePacketBuildTest.kt
@@ -0,0 +1,63 @@
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.io.packet.*
+import org.junit.*
+import kotlin.test.*
+
+class BytePacketBuildTest {
+ @Test
+ fun smokeSingleBufferTest() {
+ val p = buildPacket {
+ writeByte(0x12)
+ writeShort(0x1234)
+ writeInt(0x12345678)
+ writeDouble(1.23)
+ writeFloat(1.23f)
+ writeLong(0x123456789abcdef0)
+
+ writeStringUtf8("OK\n")
+ listOf(1, 2, 3).joinTo(this, separator = "|")
+ }
+
+ assertEquals(1 + 2 + 4 + 8 + 4 + 8 + 3 + 5, p.remaining)
+
+ assertEquals(0x12, p.readByte())
+ assertEquals(0x1234, p.readShort())
+ assertEquals(0x12345678, p.readInt())
+ assertEquals(1.23, p.readDouble())
+ assertEquals(1.23f, p.readFloat())
+ assertEquals(0x123456789abcdef0, p.readLong())
+
+ assertEquals("OK", p.readUTF8Line())
+ assertEquals("1|2|3", p.readUTF8Line())
+ }
+
+ @Test
+ fun smokeMultiBufferTest() {
+ val p = buildPacket {
+ writeFully(ByteArray(9999))
+ writeByte(0x12)
+ writeShort(0x1234)
+ writeInt(0x12345678)
+ writeDouble(1.23)
+ writeFloat(1.23f)
+ writeLong(0x123456789abcdef0)
+
+ writeStringUtf8("OK\n")
+ listOf(1, 2, 3).joinTo(this, separator = "|")
+ }
+
+ assertEquals(9999 + 1 + 2 + 4 + 8 + 4 + 8 + 3 + 5, p.remaining)
+
+ p.readFully(ByteArray(9999))
+ assertEquals(0x12, p.readByte())
+ assertEquals(0x1234, p.readShort())
+ assertEquals(0x12345678, p.readInt())
+ assertEquals(1.23, p.readDouble())
+ assertEquals(1.23f, p.readFloat())
+ assertEquals(0x123456789abcdef0, p.readLong())
+
+ assertEquals("OK", p.readUTF8Line())
+ assertEquals("1|2|3", p.readUTF8Line())
+ }
+}
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ContentByteBufferTest.kt b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ContentByteBufferTest.kt
new file mode 100644
index 0000000..2b9547e
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ContentByteBufferTest.kt
@@ -0,0 +1,53 @@
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.runBlocking
+import org.junit.Test
+import java.util.*
+import kotlin.test.assertEquals
+import kotlin.test.assertFalse
+import kotlin.test.assertTrue
+
+class ContentByteBufferTest {
+ @Test
+ fun testEmptyContent() = runBlocking {
+ val ch = ByteReadChannel(ByteArray(0))
+ assertEquals(0, ch.availableForRead)
+ assertEquals(-1, ch.readAvailable(ByteBuffer.allocate(100)))
+ assertTrue { ch.isClosedForRead }
+ }
+
+ @Test
+ fun testSingleByteContent() = runBlocking {
+ val ch = ByteReadChannel(byteArrayOf(1))
+ assertEquals(1, ch.availableForRead)
+ assertFalse { ch.isClosedForRead }
+ assertEquals(1, ch.readAvailable(ByteBuffer.allocate(100)))
+ assertEquals(0, ch.availableForRead)
+ assertTrue { ch.isClosedForRead }
+ }
+
+ @Test
+ fun testSingleByteContent2() = runBlocking {
+ val ch = ByteReadChannel(byteArrayOf(0x34))
+ assertEquals(1, ch.availableForRead)
+ assertFalse { ch.isClosedForRead }
+ assertEquals(0x34, ch.readByte())
+ assertEquals(0, ch.availableForRead)
+ assertTrue { ch.isClosedForRead }
+ }
+
+ @Test
+ fun testMultipleByteContent2() = runBlocking {
+ val arr = ByteArray(16)
+ Random().nextBytes(arr)
+ val ch = ByteReadChannel(arr)
+ assertEquals(16, ch.availableForRead)
+ assertFalse { ch.isClosedForRead }
+ ch.readByte()
+ ch.readShort()
+ ch.readInt()
+ ch.readLong()
+ ch.readByte()
+ assertTrue { ch.isClosedForRead }
+ }
+}
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/CopyAndCloseTest.kt b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/CopyAndCloseTest.kt
new file mode 100644
index 0000000..0ba8742
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/CopyAndCloseTest.kt
@@ -0,0 +1,74 @@
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.*
+import org.junit.*
+import java.io.*
+import kotlin.test.*
+
+class CopyAndCloseTest : TestBase() {
+ private val from = ByteChannel(true)
+ private val to = ByteChannel(true)
+
+ @Test
+ fun smokeTest() = runBlocking {
+ expect(1)
+
+ launch(coroutineContext) {
+ expect(2)
+ val copied = from.copyAndClose(to) // should suspend
+
+ expect(7)
+
+ assertEquals(8, copied)
+ }
+
+ yield()
+
+ expect(3)
+ from.writeInt(1)
+ expect(4)
+ from.writeInt(2)
+ expect(5)
+
+ yield()
+ expect(6)
+
+ from.close()
+ yield()
+
+ finish(8)
+ }
+
+ @Test
+ fun failurePropagation() = runBlocking {
+ expect(1)
+
+ launch(coroutineContext) {
+ expect(2)
+
+ try {
+ from.copyAndClose(to) // should suspend and then throw IOException
+ fail("Should rethrow exception")
+ } catch (expected: IOException) {
+ }
+
+ expect(4)
+ }
+
+ yield()
+ expect(3)
+
+ from.close(IOException())
+ yield()
+
+ expect(5)
+
+ try {
+ to.readInt()
+ fail("Should throw exception")
+ } catch (expected: IOException) {
+ }
+
+ finish(6)
+ }
+}
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/PooledBufferTest.kt b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/PooledBufferTest.kt
new file mode 100644
index 0000000..d8d15a0
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/PooledBufferTest.kt
@@ -0,0 +1,96 @@
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.io.internal.ObjectPool
+import kotlinx.coroutines.experimental.io.internal.ReadWriteBufferState
+import kotlinx.coroutines.experimental.runBlocking
+import org.junit.After
+import org.junit.Test
+import java.io.*
+import java.nio.ByteBuffer
+import java.util.concurrent.CopyOnWriteArrayList
+import kotlin.test.assertEquals
+import kotlin.test.assertTrue
+import kotlin.test.fail
+
+class PooledBufferTest {
+ private val allocated = CopyOnWriteArrayList<ByteBuffer>()
+
+ private inner class TestPool : ObjectPool<ReadWriteBufferState.Initial> {
+ override val capacity: Int get() = 0
+
+ override fun borrow(): ReadWriteBufferState.Initial {
+ val buffer = ReadWriteBufferState.Initial(ByteBuffer.allocate(4096))
+ allocated.add(buffer.backingBuffer)
+ return buffer
+ }
+
+ override fun recycle(instance: ReadWriteBufferState.Initial) {
+ if (!allocated.remove(instance.backingBuffer)) {
+ fail("Couldn't release buffer from pool")
+ }
+ }
+
+ override fun dispose() {
+ }
+ }
+
+ private val channel = ByteBufferChannel(autoFlush = true, pool = TestPool())
+
+ @After
+ fun tearDown() {
+ assertTrue { allocated.isEmpty() }
+ }
+
+ @Test
+ fun testWriteReadClose() {
+ runBlocking {
+ channel.writeInt(1)
+ assertEquals(1, allocated.size)
+ channel.readInt()
+ channel.close()
+ assertEquals(0, allocated.size)
+ }
+ }
+
+ @Test
+ fun testWriteCloseRead() {
+ runBlocking {
+ channel.writeInt(1)
+ assertEquals(1, allocated.size)
+ channel.close()
+ channel.readInt()
+ assertEquals(0, allocated.size)
+ }
+ }
+
+ @Test
+ fun testWriteCloseReadRead() {
+ runBlocking {
+ channel.writeInt(1)
+ assertEquals(1, allocated.size)
+ channel.close()
+ channel.readShort()
+ assertEquals(1, allocated.size)
+ channel.readShort()
+ assertEquals(0, allocated.size)
+ }
+ }
+
+ @Test
+ fun testCloseOnly() {
+ runBlocking {
+ channel.close()
+ assertEquals(0, allocated.size)
+ }
+ }
+
+ @Test
+ fun testCloseWithEerror() {
+ runBlocking {
+ channel.writeFully("OK".toByteArray())
+ assertEquals(1, allocated.size)
+ channel.close(IOException())
+ assertEquals(0, allocated.size)
+ }
+ }
+}
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/StringScenarioTest.kt b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/StringScenarioTest.kt
new file mode 100644
index 0000000..0056874
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/StringScenarioTest.kt
@@ -0,0 +1,268 @@
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.*
+import org.junit.*
+import org.junit.rules.*
+import java.util.concurrent.*
+import kotlin.test.*
+
+class StringScenarioTest : TestBase() {
+ @get:Rule
+ val timeout = Timeout(10L, TimeUnit.SECONDS)
+
+ private val ch = ByteBufferChannel(autoFlush = true)
+
+ @Test
+ fun testWriteCharByChar() {
+ runBlocking {
+ expect(1)
+
+ launch(coroutineContext) {
+ ch.writeStringUtf8("A")
+ expect(3)
+ yield()
+
+ expect(5)
+ ch.writeStringUtf8("B")
+ expect(6)
+ yield()
+
+ expect(7)
+ ch.writeStringUtf8("\n")
+ expect(8)
+ yield()
+ }
+
+ expect(2)
+ yield()
+
+ expect(4)
+ val line = ch.readUTF8Line()
+
+ assertEquals("AB", line)
+ expect(9)
+
+ yield()
+ expect(10)
+ finish(11)
+ }
+ }
+
+ @Test
+ fun testSplitUtf8() {
+ runBlocking {
+ val sb = StringBuilder()
+
+ expect(1)
+
+ launch(coroutineContext) {
+ expect(2)
+ val b = byteArrayOf(0xd0.toByte(), 0x9a.toByte(), 0x0a)
+ ch.writeFully(b, 0, 1)
+ yield()
+
+ expect(3)
+ assertTrue { sb.isEmpty() }
+
+ ch.writeFully(b, 1, 1)
+ yield()
+
+ expect(4)
+ assertEquals("\u041a", sb.toString())
+
+ ch.writeFully(b, 2, 1)
+ yield()
+ }
+
+ ch.readUTF8LineTo(sb)
+ expect(5)
+
+ assertEquals("\u041a", sb.toString())
+
+ finish(6)
+ }
+ }
+
+ @Test
+ fun testSplitLineDelimiter() = runBlocking {
+ expect(1)
+
+ launch(coroutineContext) {
+ expect(2)
+ ch.writeFully("ABC\r".toByteArray())
+ expect(3)
+ yield()
+
+ expect(5)
+ ch.writeFully("\n".toByteArray())
+ yield()
+ }
+
+ yield()
+
+ expect(4)
+ val line = ch.readASCIILine()
+ expect(6)
+
+ assertEquals("ABC", line)
+
+ finish(7)
+ }
+
+ @Test
+ fun testReadTailWriteFirst() = runBlocking {
+ expect(1)
+
+ launch(coroutineContext) {
+ expect(2)
+
+ ch.writeFully("ABC".toByteArray())
+
+ yield()
+
+ expect(4)
+ ch.close()
+ yield()
+ }
+
+ yield()
+
+ expect(3)
+
+ val line = ch.readUTF8Line()
+ expect(5)
+ assertEquals("ABC", line)
+
+ finish(6)
+ }
+
+ @Test
+ fun testReadTailReadFirst() = runBlocking {
+ expect(1)
+
+ launch(coroutineContext) {
+ expect(3)
+
+ ch.writeFully("ABC".toByteArray())
+
+ yield()
+
+ expect(4)
+ ch.close()
+ yield()
+ }
+
+ expect(2)
+
+ val line = ch.readUTF8Line()
+ expect(5)
+ assertEquals("ABC", line)
+
+ finish(6)
+ }
+
+ @Test
+ fun testReadThroughWrap() = runBlocking {
+ val L = ".".repeat(128)
+
+ expect(1)
+
+ launch(coroutineContext) {
+ expect(2)
+
+ ch.writeFully(ByteArray(4000))
+
+ expect(3)
+ ch.readFully(ByteArray(3999)) // keep one byte remaining to keep buffer unreleased
+
+ expect(4)
+
+ ch.writeFully(L.toByteArray())
+
+ expect(5)
+ ch.close()
+ }
+
+ yield()
+
+ expect(6)
+
+ ch.readByte()
+ expect(7)
+
+ val line = ch.readUTF8Line()
+
+ finish(8)
+
+ assertEquals(L, line)
+ }
+
+ @Test
+ fun testReadShifted() = runBlocking {
+ val L = ".".repeat(127) + "\n"
+ var base = 0
+
+ for (shift in 1..4096 - 8) {
+ expect(base + 1)
+
+ launch(coroutineContext) {
+ expect(base + 2)
+
+ ch.writeFully(ByteArray(shift))
+
+ expect(base + 3)
+ ch.readFully(ByteArray(shift - 1)) // keep one byte remaining to keep buffer unreleased
+
+ expect(base + 4)
+
+ ch.writeFully(L.toByteArray())
+
+ expect(base + 5)
+ }
+
+ yield()
+
+ expect(base + 6)
+
+ ch.readByte()
+ expect(base + 7)
+
+ val line = ch.readUTF8Line()
+
+ expect(base + 8)
+
+ assertEquals(L.dropLast(1), line)
+
+ base += 8
+ }
+
+ finish(base + 1)
+ }
+
+ @Test
+ fun writeLongLine() = runBlocking {
+ val L = ".".repeat(16384)
+
+ expect(1)
+
+ launch(coroutineContext) {
+ expect(2)
+
+ ch.writeFully(L.toByteArray())
+
+ expect(4)
+ ch.close()
+ }
+
+ yield()
+
+ expect(3)
+ val line = ch.readUTF8Line()
+
+ expect(5)
+
+ assertEquals(L, line)
+
+ finish(6)
+ }
+}
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/StringsTest.kt b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/StringsTest.kt
new file mode 100644
index 0000000..49a3656
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/StringsTest.kt
@@ -0,0 +1,301 @@
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.*
+import org.junit.*
+import org.junit.rules.*
+import java.util.*
+import java.util.concurrent.*
+import kotlin.test.*
+
+class StringsTest {
+ @get:Rule
+ val timeout = Timeout(10, TimeUnit.SECONDS)
+
+ private val channel = ByteBufferChannel(autoFlush = true)
+
+ @Test
+ fun testReadString() {
+ runBlocking {
+ writeString("Hello, World!")
+ channel.close()
+ assertEquals("Hello, World!", channel.readASCIILine())
+ }
+ }
+
+ @Test
+ fun testReadLines1() {
+ testReadLine("\r", "", "")
+ }
+
+ @Test
+ fun testReadLinesCases() {
+ testReadLine("abc", "abc", "")
+ testReadLine("", null, "")
+
+ testReadLine("\n", "", "")
+ testReadLine("\r", "", "")
+ testReadLine("\r\n", "", "")
+ testReadLine("1\n", "1", "")
+ testReadLine("1\r", "1", "")
+ testReadLine("1\r\n", "1", "")
+
+ testReadLine("\n2", "", "2")
+ testReadLine("\r2", "", "2")
+ testReadLine("\r\n2", "", "2")
+ testReadLine("1\n2", "1", "2")
+ testReadLine("1\r2", "1", "2")
+ testReadLine("1\r\n2", "1", "2")
+
+ // unicode
+ testReadLine("\u0440\n", "\u0440", "")
+ testReadLine("\u0440\n1", "\u0440", "1")
+ testReadLine("\u0440\r", "\u0440", "")
+ testReadLine("\u0440\r2", "\u0440", "2")
+ }
+
+ private fun testReadLine(source: String, expectedLine: String?, expectedRemaining: String) {
+ val content = source.toByteArray(Charsets.UTF_8)
+
+ // no splitting
+ runBlocking {
+ val ch = ByteReadChannel(content)
+// testReadLine(ch, expectedLine, expectedRemaining)
+ }
+
+ // split
+ for (splitAt in 0 until content.size) {
+ val ch = ByteChannel(true)
+ runBlocking {
+ launch(coroutineContext) {
+ ch.writeFully(content, 0, splitAt)
+ yield()
+ ch.writeFully(content, splitAt, content.size - splitAt)
+ ch.close()
+ }
+
+ testReadLine(ch, expectedLine, expectedRemaining)
+ }
+ }
+ }
+
+ private suspend fun testReadLine(ch: ByteReadChannel, expectedLine: String?, expectedRemaining: String) {
+ val line = ch.readUTF8Line()
+ assertEquals(expectedLine, line)
+
+ val buffer = ByteBuffer.allocate(8192)
+ val rc = ch.readAvailable(buffer)
+
+ if (expectedRemaining.isNotEmpty()) {
+ assertNotEquals(-1, rc, "Unexpected EOF. Expected >= 0")
+ }
+
+ buffer.flip()
+ assertEquals(expectedRemaining, Charsets.UTF_8.decode(buffer).toString())
+ }
+
+ @Test
+ fun testReadLines() {
+ runBlocking {
+ writeString("Hello, World!\nLine2")
+ assertEquals("Hello, World!", channel.readASCIILine())
+ channel.close()
+ assertEquals("Line2", channel.readASCIILine())
+ }
+ }
+
+ @Test
+ fun testReadASCIILineLf() {
+ runBlocking {
+ writeParts("A", "B\n", "C")
+
+ assertEquals("AB", channel.readASCIILine())
+ assertEquals("C", channel.readASCIILine())
+ assertEquals(null, channel.readASCIILine())
+ }
+ }
+
+ @Test
+ fun testReadASCIILineCrLf() {
+ runBlocking {
+ writeParts("A", "B\r\n", "C")
+
+ assertEquals("AB", channel.readASCIILine())
+ assertEquals("C", channel.readASCIILine())
+ assertEquals(null, channel.readASCIILine())
+ }
+ }
+
+ @Test
+ fun testReadASCIILineCrLfBadSplit() {
+ runBlocking {
+ writeParts("A", "B\r", "\nC")
+
+ assertEquals("AB", channel.readASCIILine())
+ assertEquals("C", channel.readASCIILine())
+ assertEquals(null, channel.readASCIILine())
+ }
+ }
+
+ @Test
+ fun testReadASCIILineTrailingLf() {
+ runBlocking {
+ writeParts("A", "B\n", "C\n")
+
+ assertEquals("AB", channel.readASCIILine())
+ assertEquals("C", channel.readASCIILine())
+ assertEquals(null, channel.readASCIILine())
+ }
+ }
+
+ @Test
+ fun testReadASCIILineLeadingLf() {
+ runBlocking {
+ writeParts("\nA", "B\n", "C")
+
+ assertEquals("", channel.readASCIILine())
+ assertEquals("AB", channel.readASCIILine())
+ assertEquals("C", channel.readASCIILine())
+ assertEquals(null, channel.readASCIILine())
+ }
+ }
+
+ @Test
+ fun testLookAhead() {
+ val text = buildString() {
+ for (i in 0 until 65535) {
+ append((i and 0xf).toString(16))
+ }
+ }.toByteArray()
+
+ runBlocking {
+ launch(CommonPool) {
+ channel.writeFully(text)
+ channel.close()
+ }
+
+ val comparison = ByteBuffer.wrap(text)
+
+ val arr = ByteArray(128)
+ var rem = text.size
+ val rnd = Random()
+
+ while (rem > 0) {
+ val s = rnd.nextInt(arr.size).coerceIn(1, rem)
+ arr.fill(0)
+ val rc = channel.readAvailable(arr, 0, s)
+
+ if (rc == -1) fail("EOF")
+
+ val actual = String(arr, 0, rc)
+
+ val expectedBytes = ByteArray(rc)
+ comparison.get(expectedBytes)
+ val expected = expectedBytes.toString(Charsets.ISO_8859_1)
+
+ assertEquals(expected, actual)
+
+ rem -= rc
+ }
+ }
+ }
+
+ @Test
+ fun testLongLinesConcurrent() {
+ val lines = (0..1024).map { size ->
+ buildString(size) {
+ for (i in 0 until size) {
+ append((i and 0xf).toString(16))
+ }
+ }
+ }
+
+ runBlocking {
+ launch(CommonPool) {
+ for (part in lines) {
+ writeString(part + "\n")
+ }
+ channel.close()
+ }
+
+ for (expected in lines) {
+ assertEquals(expected, channel.readASCIILine(expected.length))
+ }
+
+ assertNull(channel.readASCIILine())
+ }
+ }
+
+ @Test
+ fun testLongLinesSequential() {
+ val lines = (0..1024).map { size ->
+ buildString(size) {
+ for (i in 0 until size) {
+ append((i and 0xf).toString(16))
+ }
+ }
+ }
+
+ runBlocking {
+ launch(coroutineContext) {
+ for (part in lines) {
+ writeString(part + "\n")
+ yield()
+ }
+ channel.close()
+ }
+
+ for (expected in lines) {
+ Thread.yield()
+ assertEquals(expected, channel.readASCIILine(expected.length))
+ }
+
+ assertNull(channel.readASCIILine())
+ }
+ }
+
+ @Test
+ fun testReadUTF8Line2bytes() {
+ val parts = byteArrayOf(0xd0.toByte(), 0x9a.toByte(), 0x0a)
+
+ runBlocking {
+ channel.writeFully(parts)
+ assertEquals("\u041a", channel.readUTF8Line())
+ }
+ }
+
+ @Test
+ fun testReadUTF8Line3bytes() {
+ val parts = byteArrayOf(0xe0.toByte(), 0xaf.toByte(), 0xb5.toByte(), 0x0a)
+
+ runBlocking {
+ channel.writeFully(parts)
+ assertEquals("\u0BF5", channel.readUTF8Line())
+ }
+ }
+
+ @Test
+ fun testReadUTF8Line4bytes() {
+ val parts = byteArrayOf(0xF0.toByte(), 0xA6.toByte(), 0x88.toByte(), 0x98.toByte(), 0x0a)
+
+ runBlocking {
+ channel.writeFully(parts)
+ assertEquals("\uD858\uDE18", channel.readUTF8Line())
+ }
+ }
+
+ private suspend fun writeString(s: String) {
+ channel.writeFully(s.toByteArray(Charsets.ISO_8859_1))
+ }
+
+ private fun writeParts(vararg parts: String) {
+ launch(CommonPool) {
+ parts.forEach { p ->
+ writeString(p)
+ yield()
+ delay(1)
+ }
+
+ channel.close()
+ }
+ }
+}
\ No newline at end of file