Merge branch 'develop' into byte-channel-re
diff --git a/integration/kotlinx-coroutines-io/pom.xml b/integration/kotlinx-coroutines-io/pom.xml
new file mode 100644
index 0000000..c736bad
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/pom.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>kotlinx-coroutines</artifactId>
+        <groupId>org.jetbrains.kotlinx</groupId>
+        <version>0.17-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>kotlinx-coroutines-io</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.jetbrains.kotlinx</groupId>
+            <artifactId>kotlinx-coroutines-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.jetbrains.kotlin</groupId>
+            <artifactId>kotlin-test-junit</artifactId>
+            <version>${kotlin.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.jetbrains.kotlinx</groupId>
+            <artifactId>kotlinx-coroutines-core</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <classifier>tests</classifier>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <sourceDirectory>src/main/kotlin</sourceDirectory>
+        <testSourceDirectory>src/test/kotlin</testSourceDirectory>
+    </build>
+</project>
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBuffer.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBuffer.kt
new file mode 100644
index 0000000..42eecf4
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBuffer.kt
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.io
+
+/**
+ * Byte buffer.
+ */
+typealias ByteBuffer = java.nio.ByteBuffer
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt
new file mode 100644
index 0000000..1486f22
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt
@@ -0,0 +1,1308 @@
+@file:Suppress("UsePropertyAccessSyntax") // for ByteBuffer.getShort/getInt/etc
+
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.*
+import kotlinx.coroutines.experimental.io.internal.*
+import kotlinx.coroutines.experimental.io.packet.*
+import java.nio.*
+import java.util.concurrent.atomic.*
+
+// implementation for ByteChannel
+internal class ByteBufferChannel(
+    override val autoFlush: Boolean,
+    private val pool: ObjectPool<ReadWriteBufferState.Initial> = BufferObjectPool,
+    private val reservedSize: Int = RESERVED_SIZE
+) : ByteChannel {
+    // internal constructor for reading of byte buffers
+    constructor(content: ByteBuffer) : this(false, BufferObjectNoPool, 0) {
+        state = ReadWriteBufferState.Initial(content.slice(), 0).apply {
+            capacity.resetForRead()
+        }.startWriting()
+        restoreStateAfterWrite()
+        close()
+        tryTerminate()
+    }
+
+    @Volatile
+    private var state: ReadWriteBufferState = ReadWriteBufferState.IdleEmpty
+
+    @Volatile
+    private var closed: ClosedElement? = null
+
+    @Volatile
+    private var readOp: CancellableContinuation<Boolean>? = null
+
+    @Volatile
+    private var writeOp: CancellableContinuation<Unit>? = null
+
+    private var readPosition = 0
+    private var writePosition = 0
+
+    override var readByteOrder: ByteOrder = ByteOrder.BIG_ENDIAN
+    override var writeByteOrder: ByteOrder = ByteOrder.BIG_ENDIAN
+
+    override val availableForRead: Int
+        get() = state.capacity.availableForRead
+
+    override val availableForWrite: Int
+        get() = state.capacity.availableForWrite
+
+    override val isClosedForRead: Boolean
+        get() = state === ReadWriteBufferState.Terminated
+
+    override val isClosedForWrite: Boolean
+        get() = closed != null
+
+    override fun close(cause: Throwable?): Boolean {
+        if (closed != null) return false
+        val newClosed = if (cause == null) ClosedElement.EmptyCause else ClosedElement(cause)
+        if (!Closed.compareAndSet(this, null, newClosed)) return false
+        flush()
+        if (state.capacity.isEmpty() || cause != null) tryTerminate()
+        resumeClosed(cause)
+        return true
+    }
+
+    override fun flush() {
+        if (!state.capacity.flush()) return
+        resumeReadOp()
+        if (availableForWrite > 0) resumeWriteOp()
+    }
+
+    private fun ByteBuffer.prepareBuffer(order: ByteOrder, position: Int, available: Int) {
+        require(position >= 0)
+        require(available >= 0)
+
+        val bufferLimit = capacity() - reservedSize
+        val virtualLimit = position + available
+
+        order(order)
+        limit(virtualLimit.coerceAtMost(bufferLimit))
+        position(position)
+    }
+
+    private fun setupStateForWrite(): ByteBuffer {
+        var _allocated: ReadWriteBufferState.Initial? = null
+        val (old, newState) = updateState { state ->
+            when (state) {
+                ReadWriteBufferState.IdleEmpty -> {
+                    val allocated = _allocated ?: newBuffer().also { _allocated = it }
+                    allocated.startWriting()
+                }
+                ReadWriteBufferState.Terminated -> throw closed!!.sendException
+                else -> {
+                    state.startWriting()
+                }
+            }
+        }
+        val buffer = newState.writeBuffer
+
+        _allocated?.let { allocated ->
+            if (old !== ReadWriteBufferState.IdleEmpty) {
+                releaseBuffer(allocated)
+            }
+        }
+        return buffer.apply {
+            prepareBuffer(writeByteOrder, writePosition, newState.capacity.availableForWrite)
+        }
+    }
+
+    private fun restoreStateAfterWrite() {
+        var toRelease: ReadWriteBufferState.IdleNonEmpty? = null
+
+        val (_, newState) = updateState {
+            val writeStopped = it.stopWriting()
+            if (writeStopped is ReadWriteBufferState.IdleNonEmpty && writeStopped.capacity.isEmpty()) {
+                toRelease = writeStopped
+                ReadWriteBufferState.IdleEmpty
+            } else {
+                writeStopped
+            }
+        }
+
+        if (newState === ReadWriteBufferState.IdleEmpty) {
+            toRelease?.let { releaseBuffer(it.initial) }
+        }
+    }
+
+    private fun setupStateForRead(): ByteBuffer? {
+        val (_, newState) = updateState { state ->
+            when (state) {
+                ReadWriteBufferState.Terminated -> closed!!.cause?.let { throw it } ?: return null
+                ReadWriteBufferState.IdleEmpty -> closed?.cause?.let { throw it } ?: return null
+                else -> {
+                    if (state.capacity.availableForRead == 0) return null
+                    state.startReading()
+                }
+            }
+        }
+
+        return newState.readBuffer.apply {
+            prepareBuffer(readByteOrder, readPosition, newState.capacity.availableForRead)
+        }
+    }
+
+    private fun restoreStateAfterRead() {
+        var toRelease: ReadWriteBufferState.IdleNonEmpty? = null
+
+        val (_, newState) = updateState { state ->
+            toRelease?.let {
+                it.capacity.resetForWrite()
+                resumeWriteOp()
+                toRelease = null
+            }
+
+            val readStopped = state.stopReading()
+
+            if (readStopped is ReadWriteBufferState.IdleNonEmpty) {
+                if (this.state === state && readStopped.capacity.tryLockForRelease()) {
+                    toRelease = readStopped
+                    ReadWriteBufferState.IdleEmpty
+                } else {
+                    readStopped
+                }
+            } else {
+                readStopped
+            }
+        }
+
+        if (newState === ReadWriteBufferState.IdleEmpty) {
+            toRelease?.let { releaseBuffer(it.initial) }
+            resumeWriteOp()
+        }
+    }
+
+    private fun tryTerminate() {
+        val closed = closed ?: return
+        var toRelease: ReadWriteBufferState.Initial? = null
+
+        updateState { state ->
+            when {
+                state === ReadWriteBufferState.IdleEmpty -> ReadWriteBufferState.Terminated
+                closed.cause != null && state is ReadWriteBufferState.IdleNonEmpty -> {
+                    toRelease = state.initial
+                    ReadWriteBufferState.Terminated
+                }
+                else -> return
+            }
+        }
+
+        toRelease?.let { buffer ->
+            if (state === ReadWriteBufferState.Terminated) {
+                releaseBuffer(buffer)
+            }
+        }
+
+        WriteOp.getAndSet(this, null)?.resumeWithException(closed.sendException)
+        ReadOp.getAndSet(this, null)?.apply {
+            if (closed.cause != null) resumeWithException(closed.cause) else resume(false)
+        }
+    }
+
+    private fun ByteBuffer.carryIndex(idx: Int) = if (idx >= capacity() - reservedSize) idx - (capacity() - reservedSize) else idx
+
+    private inline fun writing(block: ByteBuffer.(RingBufferCapacity) -> Unit) {
+        val buffer = setupStateForWrite()
+        val capacity = state.capacity
+        try {
+            closed?.let { throw it.sendException }
+            block(buffer, capacity)
+        } finally {
+            if (capacity.isFull() || autoFlush) flush()
+            restoreStateAfterWrite()
+            tryTerminate()
+        }
+    }
+
+    private inline fun reading(block: ByteBuffer.(RingBufferCapacity) -> Boolean): Boolean {
+        val buffer = setupStateForRead() ?: return false
+        val capacity = state.capacity
+        try {
+            if (capacity.availableForRead == 0) return false
+
+            return block(buffer, capacity)
+        } finally {
+            restoreStateAfterRead()
+            tryTerminate()
+        }
+    }
+
+    private fun readAsMuchAsPossible(dst: ByteBuffer, consumed0: Int = 0): Int {
+        var consumed = 0
+
+        val rc = reading {
+            val part = it.tryReadAtMost(minOf(remaining(), dst.remaining()))
+            if (part > 0) {
+                consumed += part
+
+                if (dst.remaining() >= remaining()) {
+                    dst.put(this)
+                } else {
+                    repeat(part) {
+                        dst.put(get())
+                    }
+                }
+
+                bytesRead(it, part)
+                true
+            } else {
+                false
+            }
+        }
+
+        return if (rc && dst.hasRemaining() && state.capacity.availableForRead > 0)
+            readAsMuchAsPossible(dst, consumed0 + consumed)
+        else consumed + consumed0
+    }
+
+    private fun readAsMuchAsPossible(dst: ByteArray, offset: Int, length: Int, consumed0: Int = 0): Int {
+        var consumed = 0
+
+        val rc = reading {
+            val part = it.tryReadAtMost(minOf(remaining(), length))
+            if (part > 0) {
+                consumed += part
+                get(dst, offset, part)
+
+                bytesRead(it, part)
+                true
+            } else {
+                false
+            }
+        }
+
+        return if (rc && consumed < length && state.capacity.availableForRead > 0)
+            readAsMuchAsPossible(dst, offset + consumed, length - consumed, consumed0 + consumed)
+        else consumed + consumed0
+    }
+
+    suspend override fun readFully(dst: ByteArray, offset: Int, length: Int) {
+        val consumed = readAsMuchAsPossible(dst, offset, length)
+
+        if (consumed < length) {
+            readFullySuspend(dst, offset + consumed, length - consumed)
+        }
+    }
+
+    suspend override fun readFully(dst: ByteBuffer): Int {
+        val rc = readAsMuchAsPossible(dst)
+        if (!dst.hasRemaining()) return rc
+
+        return readFullySuspend(dst, rc)
+    }
+
+    private suspend fun readFullySuspend(dst: ByteBuffer, rc0: Int): Int {
+        var copied = rc0
+
+        while (dst.hasRemaining()) {
+            if (!readSuspend(1)) throw ClosedReceiveChannelException("Unexpected EOF: expected ${dst.remaining()} more bytes")
+            copied += readAsMuchAsPossible(dst)
+        }
+
+        return copied
+    }
+
+    private suspend tailrec fun readFullySuspend(dst: ByteArray, offset: Int, length: Int) {
+        if (!readSuspend(1)) throw ClosedReceiveChannelException("Unexpected EOF: expected $length more bytes")
+
+        val consumed = readAsMuchAsPossible(dst, offset, length)
+
+        if (consumed < length) {
+            readFullySuspend(dst, offset + consumed, length - consumed)
+        }
+    }
+
+    suspend override fun readAvailable(dst: ByteArray, offset: Int, length: Int): Int {
+        val consumed = readAsMuchAsPossible(dst, offset, length)
+
+        return when {
+            consumed == 0 && closed != null -> -1
+            consumed > 0 || length == 0 -> consumed
+            else -> readAvailableSuspend(dst, offset, length)
+        }
+    }
+
+    suspend override fun readAvailable(dst: ByteBuffer): Int {
+        val consumed = readAsMuchAsPossible(dst)
+
+        return when {
+            consumed == 0 && closed != null -> -1
+            consumed > 0 || !dst.hasRemaining() -> consumed
+            else -> readAvailableSuspend(dst)
+        }
+    }
+
+    private suspend fun readAvailableSuspend(dst: ByteArray, offset: Int, length: Int): Int {
+        if (!readSuspend(1)) return -1
+        return readAvailable(dst, offset, length)
+    }
+
+    private suspend fun readAvailableSuspend(dst: ByteBuffer): Int {
+        if (!readSuspend(1)) return -1
+        return readAvailable(dst)
+    }
+
+    suspend override fun readPacket(size: Int): ByteReadPacket {
+        closed?.cause?.let { throw it }
+
+        if (size == 0) return ByteReadPacketEmpty
+
+        val builder = ByteWritePacketImpl(BufferPool)
+        val buffer = BufferPool.borrow()
+
+        try {
+            var remaining = size
+            while (remaining > 0) {
+                buffer.clear()
+                if (buffer.remaining() > remaining) {
+                    buffer.limit(remaining)
+                }
+
+                val rc = readFully(buffer)
+                buffer.flip()
+                builder.writeFully(buffer)
+
+                remaining -= rc
+            }
+
+            return builder.build()
+        } catch (t: Throwable) {
+            builder.release()
+            throw t
+        } finally {
+            BufferPool.recycle(buffer)
+        }
+    }
+
+    suspend override fun readByte(): Byte {
+        var b: Byte = 0
+
+        val rc = reading {
+            if (it.tryReadExact(1)) {
+                b = get()
+                bytesRead(it, 1)
+                true
+            } else false
+        }
+
+        return if (rc) {
+            b
+        } else {
+            readByteSuspend()
+        }
+    }
+
+    private suspend fun readByteSuspend(): Byte {
+        if (!readSuspend(1)) throw ClosedReceiveChannelException("EOF: one byte required")
+        return readByte()
+    }
+
+    suspend override fun readBoolean(): Boolean {
+        var b = false
+
+        val rc = reading {
+            if (it.tryReadExact(1)) {
+                b = get() != 0.toByte()
+                bytesRead(it, 1)
+                true
+            } else false
+        }
+
+        return if (rc) {
+            b
+        } else {
+            readBooleanSuspend()
+        }
+    }
+
+    private suspend fun readBooleanSuspend(): Boolean {
+        if (!readSuspend(1)) throw ClosedReceiveChannelException("EOF: one byte required")
+        return readBoolean()
+    }
+
+    suspend override fun readShort(): Short {
+        var sh: Short = 0
+
+        val rc = reading {
+            if (it.tryReadExact(2)) {
+                if (remaining() < 2) rollBytes(2)
+                sh = getShort()
+                bytesRead(it, 2)
+                true
+            } else false
+        }
+
+        return if (rc) {
+            sh
+        } else {
+            readShortSuspend()
+        }
+    }
+
+    private suspend fun readShortSuspend(): Short {
+        if (!readSuspend(2)) throw ClosedReceiveChannelException("EOF while byte expected")
+        return readShort()
+    }
+
+    suspend override fun readInt(): Int {
+        var i = 0
+
+        val rc = reading {
+            if (it.tryReadExact(4)) {
+                if (remaining() < 4) rollBytes(4)
+                i = getInt()
+                bytesRead(it, 4)
+                true
+            } else false
+        }
+
+        return if (rc) {
+            i
+        } else {
+            readIntSuspend()
+        }
+    }
+
+    private suspend fun readIntSuspend(): Int {
+        if (!readSuspend(4)) throw ClosedReceiveChannelException("EOF while an int expected")
+        return readInt()
+    }
+
+    suspend override fun readLong(): Long {
+        var i = 0L
+
+        val rc = reading {
+            if (it.tryReadExact(8)) {
+                if (remaining() < 8) rollBytes(8)
+                i = getLong()
+                bytesRead(it, 8)
+                true
+            } else false
+        }
+
+        return if (rc) {
+            i
+        } else {
+            readLongSuspend()
+        }
+    }
+
+    private suspend fun readLongSuspend(): Long {
+        if (!readSuspend(8)) throw ClosedReceiveChannelException("EOF while a long expected")
+        return readLong()
+    }
+
+    suspend override fun readDouble(): Double {
+        var d = 0.0
+
+        val rc = reading {
+            if (it.tryReadExact(8)) {
+                if (remaining() < 8) rollBytes(8)
+                d = getDouble()
+                bytesRead(it, 8)
+                true
+            } else false
+        }
+
+        return if (rc) {
+            d
+        } else {
+            readDoubleSuspend()
+        }
+    }
+
+    private suspend fun readDoubleSuspend(): Double {
+        if (!readSuspend(8)) throw ClosedReceiveChannelException("EOF while a double expected")
+        return readDouble()
+    }
+
+    suspend override fun readFloat(): Float {
+        var f = 0.0f
+
+        val rc = reading {
+            if (it.tryReadExact(4)) {
+                if (remaining() < 4) rollBytes(4)
+                f = getFloat()
+                bytesRead(it, 4)
+                true
+            } else false
+        }
+
+        return if (rc) {
+            f
+        } else {
+            readFloatSuspend()
+        }
+    }
+
+    private suspend fun readFloatSuspend(): Float {
+        if (!readSuspend(4)) throw ClosedReceiveChannelException("EOF while an int expected")
+        return readFloat()
+    }
+
+    private fun ByteBuffer.rollBytes(n: Int) {
+        limit(position() + n)
+        for (i in 1..n - remaining()) {
+            put(capacity() + ReservedLongIndex + i, get(i))
+        }
+    }
+
+    private fun ByteBuffer.carry() {
+        val base = capacity() - reservedSize
+        for (i in base until position()) {
+            put(i - base, get(i))
+        }
+    }
+
+    private fun ByteBuffer.bytesWritten(c: RingBufferCapacity, n: Int) {
+        require(n >= 0)
+
+        writePosition = carryIndex(writePosition + n)
+        c.completeWrite(n)
+    }
+
+    private fun ByteBuffer.bytesRead(c: RingBufferCapacity, n: Int) {
+        require(n >= 0)
+
+        readPosition = carryIndex(readPosition + n)
+        c.completeRead(n)
+        resumeWriteOp()
+    }
+
+    suspend override fun writeByte(b: Byte) {
+        writing {
+            tryWriteByte(b, it)
+        }
+    }
+
+    private suspend fun ByteBuffer.tryWriteByte(b: Byte, c: RingBufferCapacity) {
+        if (c.tryWriteExact(1)) {
+            put(b)
+            bytesWritten(c, 1)
+        } else {
+            writeByteSuspend(b, c)
+        }
+    }
+
+    private suspend fun ByteBuffer.writeByteSuspend(b: Byte, c: RingBufferCapacity) {
+        writeSuspend(1)
+        tryWriteByte(b, c)
+    }
+
+    suspend override fun writeShort(s: Short) {
+        writing {
+            if (!tryWriteShort(s, it)) {
+                writeShortSuspend(s, it)
+            }
+        }
+    }
+
+    private suspend fun ByteBuffer.writeShortSuspend(s: Short, c: RingBufferCapacity) {
+        writeSuspend(2)
+        tryWriteShort(s, c)
+    }
+
+    private fun ByteBuffer.tryWriteShort(s: Short, c: RingBufferCapacity): Boolean {
+        if (c.tryWriteExact(2)) {
+            if (remaining() < 2) {
+                limit(capacity())
+                putShort(s)
+                carry()
+            } else {
+                putShort(s)
+            }
+
+            bytesWritten(c, 2)
+            return true
+        }
+
+        return false
+    }
+
+    private fun ByteBuffer.tryWriteInt(i: Int, c: RingBufferCapacity): Boolean {
+        if (c.tryWriteExact(4)) {
+            if (remaining() < 4) {
+                limit(capacity())
+                putInt(i)
+                carry()
+            } else {
+                putInt(i)
+            }
+
+            bytesWritten(c, 4)
+            return true
+        }
+
+        return false
+    }
+
+    suspend override fun writeInt(i: Int) {
+        writing {
+            if (!tryWriteInt(i, it)) {
+                writeIntSuspend(i, it)
+            }
+        }
+    }
+
+    private suspend fun ByteBuffer.writeIntSuspend(i: Int, c: RingBufferCapacity) {
+        writeSuspend(4)
+        tryWriteInt(i, c)
+    }
+
+    private fun ByteBuffer.tryWriteLong(l: Long, c: RingBufferCapacity): Boolean {
+        if (c.tryWriteExact(8)) {
+            if (remaining() < 8) {
+                limit(capacity())
+                putLong(l)
+                carry()
+            } else {
+                putLong(l)
+            }
+
+            bytesWritten(c, 8)
+            return true
+        }
+
+        return false
+    }
+
+    suspend override fun writeLong(l: Long) {
+        writing {
+            if (!tryWriteLong(l, it)) {
+                writeLongSuspend(l, it)
+            }
+        }
+    }
+
+    private suspend fun ByteBuffer.writeLongSuspend(l: Long, c: RingBufferCapacity) {
+        writeSuspend(8)
+        tryWriteLong(l, c)
+    }
+
+    suspend override fun writeDouble(d: Double) {
+        writeLong(java.lang.Double.doubleToRawLongBits(d))
+    }
+
+    suspend override fun writeFloat(f: Float) {
+        writeInt(java.lang.Float.floatToRawIntBits(f))
+    }
+
+    suspend override fun writeAvailable(src: ByteBuffer): Int {
+        val copied = writeAsMuchAsPossible(src)
+        if (copied > 0) return copied
+
+        return writeLazySuspend(src)
+    }
+
+    private suspend fun writeLazySuspend(src: ByteBuffer): Int {
+        while (true) {
+            writeSuspend(1)
+            val copied = writeAvailable(src)
+            if (copied > 0) return copied
+        }
+    }
+
+    suspend override fun writeFully(src: ByteBuffer) {
+        writeAsMuchAsPossible(src)
+        if (!src.hasRemaining()) return
+
+        return writeFullySuspend(src)
+    }
+
+    private suspend fun writeFullySuspend(src: ByteBuffer) {
+        while (src.hasRemaining()) {
+            writeSuspend(1)
+            writeAsMuchAsPossible(src)
+        }
+    }
+
+    private fun writeAsMuchAsPossible(src: ByteBuffer): Int {
+        writing {
+            var written = 0
+
+            do {
+                val possibleSize = it.tryWriteAtMost(minOf(src.remaining(), remaining()))
+                if (possibleSize == 0) break
+                require(possibleSize > 0)
+
+                if (remaining() >= src.remaining()) {
+                    put(src)
+                } else {
+                    repeat(possibleSize) {
+                        put(src.get())
+                    }
+                }
+
+                written += possibleSize
+
+                prepareBuffer(writeByteOrder, carryIndex(writePosition + written), it.availableForWrite)
+            } while (true)
+
+            bytesWritten(it, written)
+
+            return written
+        }
+
+        return 0
+    }
+
+    private fun writeAsMuchAsPossible(src: ByteArray, offset: Int, length: Int): Int {
+        writing {
+            var written = 0
+
+            do {
+                val possibleSize = it.tryWriteAtMost(minOf(length - written, remaining()))
+                if (possibleSize == 0) break
+                require(possibleSize > 0)
+
+                put(src, offset + written, possibleSize)
+                written += possibleSize
+
+                prepareBuffer(writeByteOrder, carryIndex(writePosition + written), it.availableForWrite)
+            } while (true)
+
+            bytesWritten(it, written)
+
+            return written
+        }
+
+        return 0
+    }
+
+    suspend override fun writeFully(src: ByteArray, offset: Int, length: Int) {
+        var rem = length
+        var off = offset
+
+        while (rem > 0) {
+            val s = writeAsMuchAsPossible(src, off, rem)
+            if (s == 0) break
+
+            off += s
+            rem -= s
+        }
+
+        if (rem == 0) return
+
+        writeFullySuspend(src, off, rem)
+    }
+
+    private tailrec suspend fun writeFullySuspend(src: ByteArray, offset: Int, length: Int) {
+        if (length == 0) return
+        val copied = writeAvailable(src, offset, length)
+        return writeFullySuspend(src, offset + copied, length - copied)
+    }
+
+    suspend override fun writeAvailable(src: ByteArray, offset: Int, length: Int): Int {
+        val size = writeAsMuchAsPossible(src, offset, length)
+        if (size > 0) return size
+        return writeSuspend(src, offset, length)
+    }
+
+    private suspend fun writeSuspend(src: ByteArray, offset: Int, length: Int): Int {
+        while (true) {
+            writeSuspend(1)
+            val size = writeAsMuchAsPossible(src, offset, length)
+            if (size > 0) return size
+        }
+    }
+
+    suspend override fun writePacket(packet: ByteReadPacket) {
+        closed?.sendException?.let { packet.release(); throw it }
+
+        when (packet) {
+            is ByteReadPacketEmpty -> return
+            is ByteReadPacketSingle -> writeSingleBufferPacket(packet)
+            is ByteReadPacketImpl -> writeMultipleBufferPacket(packet)
+            else -> writeExternalPacket(packet)
+        }
+    }
+
+    private suspend fun writeSingleBufferPacket(packet: ByteReadPacketSingle) {
+        val buffer = packet.steal()
+        val t = try {
+            writeAsMuchAsPossible(buffer)
+            null
+        } catch (t: Throwable) {
+            t
+        }
+
+        if (t != null) {
+            packet.pool.recycle(buffer)
+            throw t
+        }
+
+        if (buffer.hasRemaining()) {
+            return writeSingleBufferPacketSuspend(buffer, packet)
+        }
+
+        packet.pool.recycle(buffer)
+    }
+
+    private suspend fun writeMultipleBufferPacket(packet: ByteReadPacketImpl) {
+        var buffer: ByteBuffer? = null
+
+        val t = try {
+            while (packet.remaining > 0) {
+                buffer = packet.steal()
+                writeAsMuchAsPossible(buffer)
+                if (buffer.hasRemaining()) break
+                packet.pool.recycle(buffer)
+            }
+            null
+        } catch (t: Throwable) { t }
+
+        if (t != null) {
+            buffer?.let { packet.pool.recycle(it) }
+            packet.release()
+            throw t
+        }
+
+        if (buffer != null) {
+            return writeMultipleBufferPacketSuspend(buffer, packet)
+        }
+
+        packet.release()
+    }
+
+    private suspend fun writeSingleBufferPacketSuspend(buffer: ByteBuffer, packet: ByteReadPacketSingle) {
+        try {
+            writeFully(buffer)
+        } finally {
+            packet.pool.recycle(buffer)
+        }
+    }
+
+    private suspend fun writeMultipleBufferPacketSuspend(rem: ByteBuffer, packet: ByteReadPacketImpl) {
+        var buffer = rem
+
+        try {
+            do {
+                writeFully(buffer)
+                if (packet.remaining == 0) break
+                packet.pool.recycle(buffer)
+                buffer = packet.steal()
+            } while (true)
+        } finally {
+            packet.pool.recycle(buffer)
+            packet.release()
+        }
+    }
+
+    private suspend fun writeExternalPacket(packet: ByteReadPacket) {
+        val buffer = BufferPool.borrow()
+        val t = try {
+            while (packet.remaining > 0) {
+                buffer.clear()
+                packet.readLazy(buffer)
+                buffer.flip()
+                writeAsMuchAsPossible(buffer)
+                if (buffer.hasRemaining()) {
+                    buffer.compact()
+                    break
+                }
+            }
+
+            null
+        } catch (t: Throwable) {
+            t
+        }
+
+        buffer.flip()
+        if (buffer.hasRemaining()) {
+            return writeExternalPacketSuspend(buffer, packet)
+        }
+
+        BufferPool.recycle(buffer)
+        packet.release()
+
+        if (t != null) throw t
+    }
+
+    private suspend fun writeExternalPacketSuspend(buffer: ByteBuffer, packet: ByteReadPacket) {
+        try {
+            do {
+                buffer.compact()
+                packet.readLazy(buffer)
+                buffer.flip()
+                writeFully(buffer)
+            } while (packet.remaining > 0)
+        } finally {
+            BufferPool.recycle(buffer)
+            packet.release()
+        }
+    }
+
+    /**
+     * Invokes [visitor] for every available batch until all bytes processed or visitor if visitor returns false.
+     * Never invokes [visitor] with empty buffer unless [last] = true. Invokes visitor with last = true at most once
+     * even if there are remaining bytes and visitor returned true.
+     */
+    override suspend fun lookAhead(visitor: (buffer: ByteBuffer, last: Boolean) -> Boolean) {
+        if (lookAheadFast(false, visitor)) return
+        lookAheadSuspend(visitor)
+    }
+
+    private inline fun lookAheadFast(last: Boolean, visitor: (buffer: ByteBuffer, last: Boolean) -> Boolean): Boolean {
+        if (state === ReadWriteBufferState.Terminated && !last) return false
+
+        val rc = reading {
+            do {
+                val available = state.capacity.availableForRead
+
+                val rem = if (available > 0 || last) {
+                    if (!visitor(this, last)) {
+                        afterBufferVisited(this, it)
+                        return true
+                    }
+
+                    val consumed = afterBufferVisited(this, it)
+                    available - consumed
+                } else 0
+            } while (rem > 0 && !last)
+
+            last
+        }
+
+        if (!rc && closed != null) {
+            visitor(EmptyByteBuffer, true)
+        }
+
+        return rc
+    }
+
+    private suspend fun lookAheadSuspend(visitor: (buffer: ByteBuffer, last: Boolean) -> Boolean): Boolean {
+        var last = false
+
+        do {
+            if (lookAheadFast(last, visitor)) return true
+            if (last) return false
+            if (!readSuspend(1)) {
+                last = true
+            }
+        } while (true)
+    }
+
+    private fun afterBufferVisited(buffer: ByteBuffer, c: RingBufferCapacity): Int {
+        val consumed = buffer.position() - readPosition
+        if (consumed > 0) {
+            if (!c.tryReadExact(consumed)) throw IllegalStateException("Consumed more bytes than available")
+
+            buffer.bytesRead(c, consumed)
+            buffer.prepareBuffer(readByteOrder, readPosition, c.availableForRead)
+        }
+
+        return consumed
+    }
+
+    private suspend fun readUTF8LineToAscii(out: Appendable, limit: Int): Boolean {
+        if (state === ReadWriteBufferState.Terminated) return false
+
+        var cr = false
+        var consumed = 0
+        var unicodeStarted = false
+        var eol = false
+
+        lookAheadFast(false) { buffer, last ->
+            var forceConsume = false
+
+            val rejected = !buffer.decodeASCII { ch ->
+                when {
+                    ch == '\r' -> {
+                        cr = true
+                        true
+                    }
+                    ch == '\n' -> {
+                        eol = true
+                        forceConsume = true
+                        false
+                    }
+                    cr -> {
+                        cr = false
+                        eol = true
+                        false
+                    }
+                    else -> {
+                        if (consumed == limit) throw BufferOverflowException()
+                        consumed++
+                        out.append(ch)
+                        true
+                    }
+                }
+            }
+
+            if (cr && last) {
+                eol = true
+            }
+
+            if (eol && forceConsume) {
+                buffer.position(buffer.position() + 1)
+            }
+
+            if (rejected && buffer.hasRemaining() && !eol) {
+                unicodeStarted = true
+                false
+            } else
+                !eol && !last
+        }
+
+        if (eol && !unicodeStarted) return true
+        return readUTF8LineToUtf8(out, limit - consumed, cr, consumed)
+    }
+
+    private suspend fun readUTF8LineToUtf8(out: Appendable, limit: Int, cr0: Boolean, consumed0: Int): Boolean {
+        var cr1 = cr0
+        var consumed1 = 0
+        var eol = false
+
+        lookAheadFast(false) { buffer, last ->
+            var forceConsume = false
+
+            val rc = buffer.decodeUTF8 { ch ->
+                when {
+                    ch == '\r' -> {
+                        cr1 = true
+                        true
+                    }
+                    ch == '\n' -> {
+                        eol = true
+                        forceConsume = true
+                        false
+                    }
+                    cr1 -> {
+                        cr1 = false
+                        eol = true
+                        false
+                    }
+                    else -> {
+                        if (consumed1 == limit) throw BufferOverflowException()
+                        consumed1++
+                        out.append(ch)
+                        true
+                    }
+                }
+            }
+
+            if (cr1 && last) {
+                eol = true
+            }
+
+            if (eol && forceConsume) {
+                buffer.position(buffer.position() + 1)
+            }
+
+            rc != 0 && !eol && !last
+        }
+
+        if (eol) return true
+
+        return readUTF8LineToUtf8Suspend(out, limit, cr1, consumed1 + consumed0)
+    }
+
+    private suspend fun readUTF8LineToUtf8Suspend(out: Appendable, limit: Int, cr0: Boolean, consumed0: Int): Boolean {
+        var cr1 = cr0
+        var consumed1 = 0
+        var eol = false
+        var wrap = 0
+
+        lookAheadSuspend { buffer, last ->
+            var forceConsume = false
+
+            val rc = buffer.decodeUTF8 { ch ->
+                when {
+                    ch == '\r' -> {
+                        cr1 = true
+                        true
+                    }
+                    ch == '\n' -> {
+                        eol = true
+                        forceConsume = true
+                        false
+                    }
+                    cr1 -> {
+                        cr1 = false
+                        eol = true
+                        false
+                    }
+                    else -> {
+                        if (consumed1 == limit) throw BufferOverflowException()
+                        consumed1++
+                        out.append(ch)
+                        true
+                    }
+                }
+            }
+
+            if (cr1 && last) {
+                eol = true
+            }
+
+            if (eol && forceConsume) {
+                buffer.position(buffer.position() + 1)
+            }
+
+            wrap = maxOf(0, rc)
+
+            wrap == 0 && !eol && !last
+        }
+
+        if (wrap != 0) {
+            if (!readSuspend(wrap)) {
+
+            }
+
+            return readUTF8LineToUtf8Suspend(out, limit, cr1, consumed1)
+        }
+
+        return (consumed1 > 0 || consumed0 > 0 || eol)
+    }
+
+    suspend override fun <A : Appendable> readUTF8LineTo(out: A, limit: Int) = readUTF8LineToAscii(out, limit)
+
+    private fun resumeReadOp() {
+        ReadOp.getAndSet(this, null)?.resume(true)
+    }
+
+    private fun resumeWriteOp() {
+        WriteOp.getAndSet(this, null)?.apply {
+            val closed = closed
+            if (closed == null) resume(Unit) else resumeWithException(closed.sendException)
+        }
+    }
+
+    private fun resumeClosed(cause: Throwable?) {
+        ReadOp.getAndSet(this, null)?.let { c ->
+            if (cause != null)
+                c.resumeWithException(cause)
+            else
+                c.resume(state.capacity.availableForRead > 0)
+        }
+
+        WriteOp.getAndSet(this, null)?.tryResumeWithException(cause ?: ClosedSendChannelException(null))
+    }
+
+    private tailrec suspend fun readSuspend(size: Int): Boolean {
+        if (state.capacity.availableForRead >= size) return true
+
+        closed?.let { c ->
+            if (c.cause == null) return false
+            throw c.cause
+        }
+
+        if (!readSuspendImpl(size)) return false
+
+        return readSuspend(size)
+    }
+
+    private suspend fun readSuspendImpl(size: Int): Boolean {
+        if (state.capacity.availableForRead >= size) return true
+
+        return suspendCancellableCoroutine(holdCancellability = true) { c ->
+            do {
+                if (state.capacity.availableForRead >= size) {
+                    c.resume(true)
+                    break
+                }
+
+                closed?.let {
+                    if (it.cause == null && state.capacity.availableForRead == 0) {
+                        c.resume(false)
+                        return@suspendCancellableCoroutine
+                    } else if (it.cause != null) {
+                        c.resumeWithException(it.cause)
+                        return@suspendCancellableCoroutine
+                    }
+                }
+            } while (!setContinuation({ readOp }, ReadOp, c, { closed == null && state.capacity.availableForRead < size }))
+        }
+    }
+
+    private suspend fun writeSuspend(size: Int) {
+        closed?.sendException?.let { throw it }
+
+        while (state.capacity.availableForWrite < size && state !== ReadWriteBufferState.IdleEmpty && closed == null) {
+            suspendCancellableCoroutine<Unit>(holdCancellability = true) { c ->
+                do {
+                    closed?.sendException?.let { throw it }
+                    if (state.capacity.availableForWrite >= size || state === ReadWriteBufferState.IdleEmpty) {
+                        c.resume(Unit)
+                        break
+                    }
+                } while (!setContinuation({ writeOp }, WriteOp, c, { closed == null && state.capacity.availableForWrite < size && state !== ReadWriteBufferState.IdleEmpty }))
+
+                flush()
+            }
+        }
+
+        closed?.sendException?.let { throw it }
+    }
+
+    private inline fun <T, C : CancellableContinuation<T>> setContinuation(getter: () -> C?, updater: AtomicReferenceFieldUpdater<ByteBufferChannel, C?>, continuation: C, predicate: () -> Boolean): Boolean {
+        while (true) {
+            val current = getter()
+            if (current != null) throw IllegalStateException("Operation is already in progress")
+
+            if (!predicate()) {
+                return false
+            }
+
+            if (updater.compareAndSet(this, null, continuation)) {
+                if (predicate() || !updater.compareAndSet(this, continuation, null)) {
+                    continuation.initCancellability()
+                    return true
+                }
+
+                return false
+            }
+        }
+    }
+
+    private fun newBuffer(): ReadWriteBufferState.Initial {
+        val result = pool.borrow()
+
+        result.readBuffer.order(readByteOrder)
+        result.writeBuffer.order(writeByteOrder)
+        result.capacity.resetForWrite()
+
+        return result
+    }
+
+    private fun releaseBuffer(buffer: ReadWriteBufferState.Initial) {
+        pool.recycle(buffer)
+    }
+
+    // todo: replace with atomicfu
+    private inline fun updateState(block: (ReadWriteBufferState) -> ReadWriteBufferState?):
+        Pair<ReadWriteBufferState, ReadWriteBufferState> = update({ state }, State, block)
+
+    // todo: replace with atomicfu
+    private inline fun <T : Any> update(getter: () -> T, updater: AtomicReferenceFieldUpdater<ByteBufferChannel, T>, block: (old: T) -> T?): Pair<T, T> {
+        while (true) {
+            val old = getter()
+            val newValue = block(old) ?: continue
+            if (old === newValue || updater.compareAndSet(this, old, newValue)) return Pair(old, newValue)
+        }
+    }
+
+    companion object {
+
+        private const val ReservedLongIndex = -8
+
+        // todo: replace with atomicfu, remove companion object
+        private val State = updater(ByteBufferChannel::state)
+        private val WriteOp = updater(ByteBufferChannel::writeOp)
+        private val ReadOp = updater(ByteBufferChannel::readOp)
+        private val Closed = updater(ByteBufferChannel::closed)
+    }
+
+    private class ClosedElement(val cause: Throwable?) {
+        val sendException: Throwable
+            get() = cause ?: ClosedWriteChannelException("The channel was closed")
+
+        companion object {
+            val EmptyCause = ClosedElement(null)
+        }
+    }
+}
+
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteChannel.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteChannel.kt
new file mode 100644
index 0000000..ebcfc1b
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteChannel.kt
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.io.internal.EmptyByteBuffer
+
+/**
+ * Channel for asynchronous reading and writing of sequences of bytes.
+ * This is a buffered **single-reader single-writer channel**.
+ *
+ * Read operations can be invoked concurrently with write operations, but multiple reads or multiple writes
+ * cannot be invoked concurrently with themselves. Exceptions are [close] and [flush] which can be invoked
+ * concurrently with any other operations and between themselves at any time.
+ */
+interface ByteChannel : ByteReadChannel, ByteWriteChannel
+
+/**
+ * Creates buffered channel for asynchronous reading and writing of sequences of bytes.
+ */
+public fun ByteChannel(autoFlush: Boolean = false): ByteChannel =
+    ByteBufferChannel(autoFlush)
+
+/**
+ * Creates channel for reading from the specified byte buffer.
+ */
+public fun ByteReadChannel(content: ByteBuffer): ByteReadChannel =
+    ByteBufferChannel(content)
+
+/**
+ * Creates channel for reading from the specified byte array.
+ */
+public fun ByteReadChannel(content: ByteArray): ByteReadChannel =
+    ByteBufferChannel(ByteBuffer.wrap(content))
+
+
+/**
+ * Byte channel that is always empty.
+ */
+val EmptyByteReadChannel: ByteReadChannel = ByteReadChannel(EmptyByteBuffer)
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteChannelCoroutine.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteChannelCoroutine.kt
new file mode 100644
index 0000000..f4803e5
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteChannelCoroutine.kt
@@ -0,0 +1,17 @@
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+
+internal open class ByteChannelCoroutine(
+    parentContext: CoroutineContext,
+    open val channel: ByteChannel
+) : AbstractCoroutine<Unit>(parentContext, active = true) {
+    override fun afterCompletion(state: Any?, mode: Int) {
+        val cause = (state as? JobSupport.CompletedExceptionally)?.cause
+        if (!channel.close(cause) && cause != null)
+            handleCoroutineException(context, cause)
+
+        super.afterCompletion(state, mode)
+    }
+}
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteOrder.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteOrder.kt
new file mode 100644
index 0000000..bad461f
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteOrder.kt
@@ -0,0 +1,6 @@
+package kotlinx.coroutines.experimental.io
+
+/**
+ * Byte order.
+ */
+typealias ByteOrder = java.nio.ByteOrder
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteReadChannel.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteReadChannel.kt
new file mode 100644
index 0000000..d455443
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteReadChannel.kt
@@ -0,0 +1,132 @@
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.io.internal.BufferObjectPool
+import kotlinx.coroutines.experimental.io.packet.ByteReadPacket
+
+/**
+ * Channel for asynchronous reading of sequences of bytes.
+ * This is a **single-reader channel**.
+ *
+ * Operations on this channel cannot be invoked concurrently.
+ */
+public interface ByteReadChannel {
+    /**
+     * Returns number of bytes that can be read without suspension. Read operations do no suspend and return
+     * immediately when this number is at least the number of bytes requested for read.
+     */
+    public val availableForRead: Int
+
+    /**
+     * Returns `true` if the channel is closed and no remaining bytes are available for read.
+     * It implies that [availableForRead] is zero.
+     */
+    public val isClosedForRead: Boolean
+
+    /**
+     * Byte order that is used for multi-byte read operations
+     * (such as [readShort], [readInt], [readLong], [readFloat], and [readDouble]).
+     */
+    public var readByteOrder: ByteOrder
+
+    /**
+     * Reads all available bytes to [dst] buffer and returns immediately or suspends if no bytes available
+     * @return number of bytes were read or `-1` if the channel has been closed
+     */
+    suspend fun readAvailable(dst: ByteArray, offset: Int, length: Int): Int
+    suspend fun readAvailable(dst: ByteArray) = readAvailable(dst, 0, dst.size)
+    suspend fun readAvailable(dst: ByteBuffer): Int
+
+    /**
+     * Reads all [length] bytes to [dst] buffer or fails if channel has been closed.
+     * Suspends if not enough bytes available.
+     */
+    suspend fun readFully(dst: ByteArray, offset: Int, length: Int)
+    suspend fun readFully(dst: ByteArray) = readFully(dst, 0, dst.size)
+    suspend fun readFully(dst: ByteBuffer): Int
+
+    /**
+     * Reads the specified amount of bytes and makes a byte packet from them. Fails if channel has been closed
+     * and not enough bytes available.
+     */
+    suspend fun readPacket(size: Int): ByteReadPacket
+
+    /**
+     * Reads a long number (suspending if not enough bytes available) or fails if channel has been closed
+     * and not enough bytes.
+     */
+    suspend fun readLong(): Long
+
+    /**
+     * Reads an int number (suspending if not enough bytes available) or fails if channel has been closed
+     * and not enough bytes.
+     */
+    suspend fun readInt(): Int
+
+    /**
+     * Reads a short number (suspending if not enough bytes available) or fails if channel has been closed
+     * and not enough bytes.
+     */
+    suspend fun readShort(): Short
+
+    /**
+     * Reads a byte (suspending if no bytes available yet) or fails if channel has been closed
+     * and not enough bytes.
+     */
+    suspend fun readByte(): Byte
+
+    /**
+     * Reads a boolean value (suspending if no bytes available yet) or fails if channel has been closed
+     * and not enough bytes.
+     */
+    suspend fun readBoolean(): Boolean
+
+    /**
+     * Reads double number (suspending if not enough bytes available) or fails if channel has been closed
+     * and not enough bytes.
+     */
+    suspend fun readDouble(): Double
+
+    /**
+     * Reads float number (suspending if not enough bytes available) or fails if channel has been closed
+     * and not enough bytes.
+     */
+    suspend fun readFloat(): Float
+
+    suspend fun lookAhead(visitor: (buffer: ByteBuffer, last: Boolean) -> Boolean)
+
+    /**
+     * Reads a line of UTF-8 characters to the specified [out] buffer up to [limit] characters.
+     * Supports both CR-LF and LF line endings.
+     * Throws an exception if the specified [limit] has been exceeded.
+     *
+     * @return `true` if line has been read (possibly empty) or `false` if channel has been closed
+     * and no characters were read.
+     */
+    suspend fun <A : Appendable> readUTF8LineTo(out: A, limit: Int = Int.MAX_VALUE): Boolean
+}
+
+suspend fun ByteReadChannel.copyAndClose(dst: ByteWriteChannel): Long {
+    val o = BufferObjectPool.borrow()
+    try {
+        var copied = 0L
+        val bb = o.backingBuffer
+
+        while (true) {
+            bb.clear()
+            val size = readAvailable(bb)
+            if (size == -1) break
+
+            bb.flip()
+            dst.writeFully(bb)
+            copied += size
+        }
+
+        dst.close()
+        return copied
+    } catch (t: Throwable) {
+        dst.close(t)
+        throw t
+    } finally {
+        BufferObjectPool.recycle(o)
+    }
+}
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteWriteChannel.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteWriteChannel.kt
new file mode 100644
index 0000000..d654e29
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteWriteChannel.kt
@@ -0,0 +1,163 @@
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.io.packet.*
+import java.nio.ByteBuffer
+import java.nio.CharBuffer
+import java.util.concurrent.CancellationException
+
+/**
+ * Channel for asynchronous writing of sequences of bytes.
+ * This is a **single-writer channel**.
+ *
+ * Operations on this channel cannot be invoked concurrently, unless explicitly specified otherwise
+ * in description. Exceptions are [close] and [flush].
+ */
+public interface ByteWriteChannel {
+    /**
+     * Returns number of bytes that can be written without suspension. Write operations do no suspend and return
+     * immediately when this number is at least the number of bytes requested for write.
+     */
+    public val availableForWrite: Int
+
+    /**
+     * Returns `true` is channel has been closed and attempting to write to the channel will cause an exception.
+     */
+    public val isClosedForWrite: Boolean
+
+    /**
+     * Returns `true` if channel flushes automatically all pending bytes after every write function call.
+     * If `false` then flush only happens at manual [flush] invocation or when the buffer is full.
+     */
+    public val autoFlush: Boolean
+
+    /**
+     * Byte order that is used for multi-byte write operations
+     * (such as [writeShort], [writeInt], [writeLong], [writeFloat], and [writeDouble]).
+     */
+    public var writeByteOrder: ByteOrder
+
+    /**
+     * Writes as much as possible and only suspends if buffer is full
+     */
+    suspend fun writeAvailable(src: ByteArray, offset: Int, length: Int): Int
+    suspend fun writeAvailable(src: ByteArray) = writeAvailable(src, 0, src.size)
+    suspend fun writeAvailable(src: ByteBuffer): Int
+
+    /**
+     * Writes all [src] bytes and suspends until all bytes written. Causes flush if buffer filled up or when [autoFlush]
+     * Crashes if channel get closed while writing.
+     */
+    suspend fun writeFully(src: ByteArray, offset: Int, length: Int)
+    suspend fun writeFully(src: ByteArray) = writeFully(src, 0, src.size)
+    suspend fun writeFully(src: ByteBuffer)
+
+    /**
+     * Writes a [packet] fully or fails if channel get closed before the whole packet has been written
+     */
+    suspend fun writePacket(packet: ByteReadPacket)
+    /**
+     * Writes long number and suspends until written.
+     * Crashes if channel get closed while writing.
+     */
+    suspend fun writeLong(l: Long)
+
+    /**
+     * Writes int number and suspends until written.
+     * Crashes if channel get closed while writing.
+     */
+    suspend fun writeInt(i: Int)
+
+    /**
+     * Writes short number and suspends until written.
+     * Crashes if channel get closed while writing.
+     */
+    suspend fun writeShort(s: Short)
+
+    /**
+     * Writes byte and suspends until written.
+     * Crashes if channel get closed while writing.
+     */
+    suspend fun writeByte(b: Byte)
+
+    /**
+     * Writes double number and suspends until written.
+     * Crashes if channel get closed while writing.
+     */
+    suspend fun writeDouble(d: Double)
+
+    /**
+     * Writes float number and suspends until written.
+     * Crashes if channel get closed while writing.
+     */
+    suspend fun writeFloat(f: Float)
+
+    /**
+     * Closes this channel with an optional exceptional [cause].
+     * It flushes all pending write bytes (via [flush]).
+     * This is an idempotent operation -- repeated invocations of this function have no effect and return `false`.
+     *
+     * A channel that was closed without a [cause], is considered to be _closed normally_.
+     * A channel that was closed with non-null [cause] is called a _failed channel_. Attempts to read or
+     * write on a failed channel throw this cause exception.
+     *
+     * After invocation of this operation [isClosedForWrite] starts returning `true` and
+     * all subsequent write operations throw [ClosedWriteChannelException] or the specified [cause].
+     * However, [isClosedForRead][ByteReadChannel.isClosedForRead] on the side of [ByteReadChannel]
+     * starts returning `true` only after all written bytes have been read.
+     */
+    public fun close(cause: Throwable? = null): Boolean
+
+    /**
+     * Flushes all pending write bytes making them available for read.
+     *
+     * This function is thread-safe and can be invoked in any thread at any time.
+     * It does nothing when invoked on a closed channel.
+     */
+    public fun flush()
+}
+
+suspend fun ByteWriteChannel.writeShort(s: Int) {
+    writeShort((s and 0xffff).toShort())
+}
+
+suspend fun ByteWriteChannel.writeByte(b: Int) {
+    writeByte((b and 0xff).toByte())
+}
+
+suspend fun ByteWriteChannel.writeInt(i: Long) {
+    writeInt(i.toInt())
+}
+
+suspend fun ByteWriteChannel.writeStringUtf8(s: CharSequence) {
+    val packet = buildPacket {
+        writeStringUtf8(s)
+    }
+    writePacket(packet)
+}
+
+suspend fun ByteWriteChannel.writeStringUtf8(s: CharBuffer) {
+    val packet = buildPacket {
+        writeStringUtf8(s)
+    }
+    writePacket(packet)
+}
+
+suspend fun ByteWriteChannel.writeStringUtf8(s: String) {
+    val packet = buildPacket {
+        writeStringUtf8(s)
+    }
+    writePacket(packet)
+}
+
+suspend fun ByteWriteChannel.writeBoolean(b: Boolean) {
+    writeByte(if (b) 1 else 0)
+}
+
+/**
+ * Writes UTF16 character
+ */
+suspend fun ByteWriteChannel.writeChar(ch: Char) {
+    writeShort(ch.toInt())
+}
+
+class ClosedWriteChannelException(message: String?) : CancellationException(message)
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/MigrationUtils.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/MigrationUtils.kt
new file mode 100644
index 0000000..e4815c2
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/MigrationUtils.kt
@@ -0,0 +1,39 @@
+package kotlinx.coroutines.experimental.io
+
+/**
+ * See [java.io.DataOutput.writeUTF]
+ */
+@Deprecated("Use writeStringUtf8 instead", ReplaceWith("writeStringUtf8(s)"))
+suspend fun ByteWriteChannel.writeUTF(s: String) {
+    writeStringUtf8(s)
+}
+
+/**
+ * See [java.io.DataOutput.writeChars]
+ */
+suspend fun ByteWriteChannel.writeChars(s: String) {
+    for (ch in s) {
+        writeShort(ch.toInt())
+    }
+}
+
+/**
+ * See [java.io.DataOutput.writeBytes]
+ */
+suspend fun ByteWriteChannel.writeBytes(s: String) {
+    for (ch in s) {
+        writeByte(ch.toInt())
+    }
+}
+
+/**
+ * See [java.io.DataOutput.write]
+ */
+@Deprecated("Use writeFully instead", ReplaceWith("writeFully(src)"))
+suspend fun ByteWriteChannel.write(src: ByteArray) = writeFully(src)
+
+/**
+ * See [java.io.DataOutput.write]
+ */
+@Deprecated("Use writeFully instead", ReplaceWith("writeFully(src, offset, length)"))
+suspend fun ByteWriteChannel.write(src: ByteArray, offset: Int, length: Int) = writeFully(src, offset, length)
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ReaderJob.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ReaderJob.kt
new file mode 100644
index 0000000..e88adb5
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ReaderJob.kt
@@ -0,0 +1,34 @@
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+
+/**
+ * A coroutine job that is reading from a byte channel
+ */
+interface ReaderJob : Job {
+    /**
+     * A reference to the channel that this coroutine is reading from
+     */
+    val channel: ByteWriteChannel
+}
+
+interface ReaderScope : CoroutineScope {
+    val channel: ByteReadChannel
+}
+
+fun reader(coroutineContext: CoroutineContext,
+           channel: ByteChannel,
+           block: suspend ReaderScope.() -> Unit): ReaderJob {
+    val coroutine = ReaderCoroutine(newCoroutineContext(coroutineContext), channel)
+    coroutine.initParentJob(coroutineContext[Job])
+    block.startCoroutine(coroutine, coroutine)
+    return coroutine
+}
+
+fun reader(coroutineContext: CoroutineContext,
+           autoFlush: Boolean = false,
+           block: suspend ReaderScope.() -> Unit): ReaderJob = reader(coroutineContext, ByteChannel(autoFlush), block)
+
+private class ReaderCoroutine(context: CoroutineContext, channel: ByteChannel)
+    : ByteChannelCoroutine(context, channel), ReaderJob, ReaderScope
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/Strings.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/Strings.kt
new file mode 100644
index 0000000..c6923cb
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/Strings.kt
@@ -0,0 +1,14 @@
+package kotlinx.coroutines.experimental.io
+
+suspend fun ByteReadChannel.readASCIILine(estimate: Int = 16, limit: Int = Int.MAX_VALUE): String? {
+    val sb = StringBuilder(estimate)
+    return if (readUTF8LineTo(sb, limit)) sb.toString() else null
+}
+
+suspend fun ByteReadChannel.readUTF8Line(estimate: Int = 16, limit: Int = Int.MAX_VALUE): String? {
+    val sb = StringBuilder(estimate)
+    return if (readUTF8LineTo(sb, limit)) sb.toString() else null
+}
+
+@Deprecated("Use readUTF8Line or readASCIILine instead", ReplaceWith("readUTF8Line(estimate, limit)"))
+suspend fun ByteReadChannel.readLine(estimate: Int = 16, limit: Int = Int.MAX_VALUE): String? = readUTF8Line(estimate, limit)
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/WriterJob.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/WriterJob.kt
new file mode 100644
index 0000000..00e2f71
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/WriterJob.kt
@@ -0,0 +1,35 @@
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+
+/**
+ * A coroutine job that is writing to a byte channel
+ */
+interface WriterJob : Job {
+    /**
+     * A reference to the channel that this coroutine is writing to
+     */
+    val channel: ByteReadChannel
+}
+
+interface WriterScope : CoroutineScope {
+    val channel: ByteWriteChannel
+}
+
+fun writer(coroutineContext: CoroutineContext,
+           channel: ByteChannel,
+           block: suspend CoroutineScope.() -> Unit): WriterJob {
+    val coroutine = WriterCoroutine(newCoroutineContext(coroutineContext), channel)
+    coroutine.initParentJob(coroutineContext[Job])
+    block.startCoroutine(coroutine, coroutine)
+    return coroutine
+}
+
+fun writer(coroutineContext: CoroutineContext,
+           autoFlush: Boolean = false,
+           block: suspend CoroutineScope.() -> Unit): WriterJob = writer(coroutineContext, ByteChannel(autoFlush), block)
+
+private class WriterCoroutine(ctx: CoroutineContext, channel: ByteChannel)
+    : ByteChannelCoroutine(ctx, channel), WriterScope, WriterJob
+
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/ObjectPool.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/ObjectPool.kt
new file mode 100644
index 0000000..822a1c8
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/ObjectPool.kt
@@ -0,0 +1,144 @@
+package kotlinx.coroutines.experimental.io.internal
+
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicReferenceArray
+
+internal interface ObjectPool<T : Any> {
+    val capacity: Int
+    fun borrow(): T
+    fun recycle(instance: T) // can only recycle what was borrowed before
+    fun dispose()
+}
+
+internal val BUFFER_SIZE = getIOIntProperty("BufferSize", 4096)
+private val BUFFER_POOL_SIZE = getIOIntProperty("BufferPoolSize", 2048)
+private val BUFFER_OBJECT_POOL_SIZE = getIOIntProperty("BufferObjectPoolSize", 1024)
+
+// ------------- standard shared pool objects -------------
+
+internal val BufferPool: ObjectPool<ByteBuffer> =
+    object : ObjectPoolImpl<ByteBuffer>(BUFFER_POOL_SIZE) {
+        override fun produceInstance(): ByteBuffer =
+            ByteBuffer.allocateDirect(BUFFER_SIZE)
+        override fun clearInstance(instance: ByteBuffer): ByteBuffer =
+            instance.also { it.clear() }
+        override fun validateInstance(instance: ByteBuffer) {
+            require(instance.capacity() == BUFFER_SIZE)
+        }
+    }
+
+internal val BufferObjectPool: ObjectPool<ReadWriteBufferState.Initial> =
+    object: ObjectPoolImpl<ReadWriteBufferState.Initial>(BUFFER_OBJECT_POOL_SIZE) {
+        override fun produceInstance() =
+            ReadWriteBufferState.Initial(BufferPool.borrow())
+        override fun disposeInstance(instance: ReadWriteBufferState.Initial) {
+            BufferPool.recycle(instance.backingBuffer)
+        }
+    }
+
+internal val BufferObjectNoPool: ObjectPool<ReadWriteBufferState.Initial> =
+    object : NoPoolImpl<ReadWriteBufferState.Initial>() {
+        override fun borrow(): ReadWriteBufferState.Initial =
+            ReadWriteBufferState.Initial(ByteBuffer.allocateDirect(BUFFER_SIZE))
+    }
+
+// ------------- ObjectPoolImpl -------------
+
+private const val MULTIPLIER = 4
+private const val PROBE_COUNT = 8 // number of attepts to find a slot
+private const val MAGIC = 2654435769.toInt() // fractional part of golden ratio
+private const val MAX_CAPACITY = Int.MAX_VALUE / MULTIPLIER
+
+internal abstract class ObjectPoolImpl<T : Any>(final override val capacity: Int) : ObjectPool<T> {
+    init {
+        require(capacity > 0) { "capacity should be positive but it is $capacity" }
+        require(capacity <= MAX_CAPACITY) { "capacity should be less or equal to $MAX_CAPACITY but it is $capacity"}
+    }
+
+    protected abstract fun produceInstance(): T // factory
+    protected open fun clearInstance(instance: T) = instance // optional cleaning of poped items
+    protected open fun validateInstance(instance: T) {} // optional validation for recycled items
+    protected open fun disposeInstance(instance: T) {} // optional destruction of unpoolable items
+
+    @Volatile
+    private var top: Long = 0L
+
+    // closest power of 2 that is equal or larger than capacity * MULTIPLIER
+    private val maxIndex = Integer.highestOneBit(capacity * MULTIPLIER - 1) * 2
+    private val shift = Integer.numberOfLeadingZeros(maxIndex) + 1 // for hash function
+
+    // zero index is reserved for both
+    private val instances = AtomicReferenceArray<T?>(maxIndex + 1)
+    private val next = IntArray(maxIndex + 1)
+
+    override fun borrow(): T =
+        tryPop()?.let { clearInstance(it) } ?: produceInstance()
+
+    override fun recycle(instance: T) {
+        validateInstance(instance)
+        if (!tryPush(instance)) disposeInstance(instance)
+    }
+
+    override fun dispose() {
+        while (true) {
+            val instance = tryPop() ?: return
+            disposeInstance(instance)
+        }
+    }
+
+    private fun tryPush(instance: T): Boolean {
+        var index = ((System.identityHashCode(instance) * MAGIC) ushr shift) + 1
+        repeat (PROBE_COUNT) {
+            if (instances.compareAndSet(index, null, instance)) {
+                pushTop(index)
+                return true
+            }
+            if (--index == 0) index = maxIndex
+        }
+        return false
+    }
+
+    private fun tryPop(): T? {
+        val index = popTop()
+        return if (index == 0) null else instances.getAndSet(index, null)
+    }
+
+    private fun pushTop(index: Int) {
+        require(index > 0)
+        while (true) { // lock-free loop on top
+            val top = this.top // volatile read
+            val topVersion = (top shr 32 and 0xffffffffL) + 1L
+            val topIndex = (top and 0xffffffffL).toInt()
+            val newTop = topVersion shl 32 or index.toLong()
+            next[index] = topIndex
+            if (Top.compareAndSet(this, top, newTop)) return
+        }
+    }
+
+    private fun popTop(): Int {
+        while (true) { // lock-free loop on top
+            val top = this.top // volatile read
+            if (top == 0L) return 0
+            val newVersion = (top shr 32 and 0xffffffffL) + 1L
+            val topIndex = (top and 0xffffffffL).toInt()
+            if (topIndex == 0) return 0
+            val next = next[topIndex]
+            val newTop = newVersion shl 32 or next.toLong()
+            if (Top.compareAndSet(this, top, newTop)) return topIndex
+        }
+    }
+
+    companion object {
+        // todo: replace with atomicfu, remove companion object
+        private val Top = longUpdater(ObjectPoolImpl<*>::top)
+    }
+}
+
+// ------------- NoPoolImpl -------------
+
+internal abstract class NoPoolImpl<T : Any> : ObjectPool<T> {
+    override val capacity: Int get() = 0
+    override abstract fun borrow(): T
+    override fun recycle(instance: T) {}
+    override fun dispose() {}
+}
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/ReadWriteBufferState.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/ReadWriteBufferState.kt
new file mode 100644
index 0000000..37ab97a
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/ReadWriteBufferState.kt
@@ -0,0 +1,91 @@
+package kotlinx.coroutines.experimental.io.internal
+
+import java.nio.ByteBuffer
+
+// this is MAGICAL constant that is tied to the code ByteBufferChannel (that is how much it needs extra)
+internal const val RESERVED_SIZE = 8
+
+internal val EmptyByteBuffer: ByteBuffer = ByteBuffer.allocate(0)
+internal val EmptyCapacity = RingBufferCapacity(0)
+
+internal sealed class ReadWriteBufferState(
+    @JvmField val backingBuffer: ByteBuffer,
+    @JvmField val capacity: RingBufferCapacity
+) {
+    open val idle: Boolean get() = false
+    open val readBuffer: ByteBuffer get() = error("read buffer is not available in state $this")
+    open val writeBuffer: ByteBuffer get() = error("write buffer is not available in state $this")
+
+    internal open fun startReading(): ReadWriteBufferState = error("Reading is not available in state $this")
+    internal open fun startWriting(): ReadWriteBufferState = error("Writing is not available in state $this")
+    internal open fun stopReading(): ReadWriteBufferState = error("Unable to stop reading in state $this")
+    internal open fun stopWriting(): ReadWriteBufferState = error("Unable to stop writing in state $this")
+
+    object IdleEmpty : ReadWriteBufferState(EmptyByteBuffer, EmptyCapacity) {
+        override val idle: Boolean get() = true
+        override fun toString() = "IDLE(empty)"
+    }
+
+    class Initial(
+        backingBuffer: ByteBuffer,
+        reservedSize: Int = RESERVED_SIZE
+    ) : ReadWriteBufferState(backingBuffer, RingBufferCapacity(backingBuffer.capacity() - reservedSize)) {
+        init {
+            require(backingBuffer.position() == 0)
+            require(backingBuffer.limit() == backingBuffer.capacity())
+        }
+        override val writeBuffer: ByteBuffer = backingBuffer.duplicate() // defensive copy of buffer's state
+        override val readBuffer: ByteBuffer = backingBuffer.duplicate() // must have a separate buffer state here
+        // all other possible states
+        internal val idleState = IdleNonEmpty(this)
+        internal val readingState = Reading(this)
+        internal val writingState = Writing(this)
+        internal val readingWritingState = ReadingWriting(this)
+        // state transitions
+        override fun startReading() = readingState
+        override fun startWriting() = writingState
+        override val idle: Boolean get() = error("Not available for initial state")
+        override fun toString() = "Initial"
+    }
+
+    class IdleNonEmpty internal constructor(
+        val initial: Initial // public here, so can release initial state when idle
+    ) : ReadWriteBufferState(initial.backingBuffer, initial.capacity) {
+        override fun startReading() = initial.readingState
+        override fun startWriting() = initial.writingState
+        override val idle: Boolean get() = true
+        override fun toString() = "IDLE(with buffer)"
+    }
+
+    class Reading internal constructor(
+        private val initial: Initial
+    ) : ReadWriteBufferState(initial.backingBuffer, initial.capacity) {
+        override val readBuffer: ByteBuffer get() = initial.readBuffer
+        override fun startWriting() = initial.readingWritingState
+        override fun stopReading() = initial.idleState
+        override fun toString() = "Reading"
+    }
+
+    class Writing internal constructor(
+        private val initial: Initial
+    ) : ReadWriteBufferState(initial.backingBuffer, initial.capacity) {
+        override val writeBuffer: ByteBuffer get() = initial.writeBuffer
+        override fun startReading() = initial.readingWritingState
+        override fun stopWriting() = initial.idleState
+        override fun toString() = "Writing"
+    }
+
+    class ReadingWriting internal constructor(
+        private val initial: Initial
+    ) : ReadWriteBufferState(initial.backingBuffer, initial.capacity) {
+        override val readBuffer: ByteBuffer get() = initial.readBuffer
+        override val writeBuffer: ByteBuffer get() = initial.writeBuffer
+        override fun stopReading() = initial.writingState
+        override fun stopWriting() = initial.readingState
+        override fun toString() = "Reading+Writing"
+    }
+
+    object Terminated : ReadWriteBufferState(EmptyByteBuffer, EmptyCapacity) {
+        override fun toString() = "Terminated"
+    }
+}
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/RingBufferCapacity.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/RingBufferCapacity.kt
new file mode 100644
index 0000000..529b817
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/RingBufferCapacity.kt
@@ -0,0 +1,109 @@
+package kotlinx.coroutines.experimental.io.internal
+
+internal class RingBufferCapacity(private val totalCapacity: Int) {
+    @Volatile @JvmField
+    var availableForRead = 0
+
+    @Volatile @JvmField
+    var availableForWrite = totalCapacity
+
+    @Volatile @JvmField
+    var pendingToFlush = 0
+
+    // concurrent unsafe!
+    fun resetForWrite() {
+        availableForRead = 0
+        availableForWrite = totalCapacity
+        pendingToFlush = 0
+    }
+
+    fun resetForRead() {
+        availableForRead = totalCapacity
+        availableForWrite = 0
+        pendingToFlush = 0
+    }
+
+    fun tryReadExact(n: Int): Boolean {
+        while (true) {
+            val remaining = availableForRead
+            if (remaining < n) return false
+            if (AvailableForRead.compareAndSet(this, remaining, remaining - n)) return true
+        }
+    }
+
+    fun tryReadAtMost(n: Int): Int {
+        while (true) {
+            val remaining = availableForRead
+            val delta = minOf(n, remaining)
+            if (delta == 0) return 0
+            if (AvailableForRead.compareAndSet(this, remaining, remaining - delta)) return delta
+        }
+    }
+
+    fun tryWriteExact(n: Int): Boolean {
+        while (true) {
+            val remaining = availableForWrite
+            if (remaining < n) return false
+            if (AvailableForWrite.compareAndSet(this, remaining, remaining - n)) return true
+        }
+    }
+
+    fun tryWriteAtMost(n: Int): Int {
+        while (true) {
+            val remaining = availableForWrite
+            val delta = minOf(n, remaining)
+            if (delta == 0) return 0
+            if (AvailableForWrite.compareAndSet(this, remaining, remaining - delta)) return delta
+        }
+    }
+
+    fun completeRead(n: Int) {
+        while (true) {
+            val remaining = availableForWrite
+            val update = remaining + n
+            require(update <= totalCapacity) { "Completed read overflow: $remaining + $n = $update > $totalCapacity" }
+            if (AvailableForWrite.compareAndSet(this, remaining, update)) break
+        }
+    }
+
+    fun completeWrite(n: Int) {
+        while (true) {
+            val pending = pendingToFlush
+            val update = pending + n
+            require(update <= totalCapacity) { "Complete write overflow: $pending + $n > $totalCapacity" }
+            if (PendingToFlush.compareAndSet(this, pending, update)) break
+        }
+    }
+
+    /**
+     * @return true if there are bytes available for read after flush
+     */
+    fun flush(): Boolean {
+        val pending = PendingToFlush.getAndSet(this, 0)
+        while (true) {
+            val remaining = availableForRead
+            val update = remaining + pending
+            if (remaining == update || AvailableForRead.compareAndSet(this, remaining, update)) {
+                return update > 0
+            }
+        }
+    }
+
+    fun tryLockForRelease(): Boolean {
+        while (true) {
+            val remaining = availableForWrite
+            if (pendingToFlush > 0 || availableForRead > 0 || remaining != totalCapacity) return false
+            if (AvailableForWrite.compareAndSet(this, remaining, 0)) return true
+        }
+    }
+
+    fun isEmpty(): Boolean = availableForWrite == totalCapacity
+    fun isFull(): Boolean = availableForWrite == 0
+
+    companion object {
+        // todo: replace with atomicfu, remove companion object
+        private val AvailableForRead = intUpdater(RingBufferCapacity::availableForRead)
+        private val AvailableForWrite = intUpdater(RingBufferCapacity::availableForWrite)
+        private val PendingToFlush = intUpdater(RingBufferCapacity::pendingToFlush)
+    }
+}
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/Strings.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/Strings.kt
new file mode 100644
index 0000000..06161ab
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/Strings.kt
@@ -0,0 +1,108 @@
+package kotlinx.coroutines.experimental.io.internal
+
+import java.nio.*
+import java.nio.charset.*
+
+/**
+ * Decodes all the bytes to ASCII characters until end of buffer applying every character to [consumer]
+ * It stops processing if a non-ascii character encountered and returns `false`
+ * @return `false` if a non-ascii character encountered or `true` if all bytes were processed
+ */
+internal inline fun ByteBuffer.decodeASCII(consumer: (Char) -> Boolean): Boolean {
+    while (hasRemaining()) {
+        val v = get().toInt() and 0xff
+        if (v and 0x80 != 0 || !consumer(v.toChar())) {
+            position(position() - 1)
+            return false
+        }
+    }
+
+    return true
+}
+
+/**
+ * Decodes all the bytes to utf8 applying every character on [consumer] until or consumer return `false`.
+ * If a consumer returned false then a character will be pushed back (including all surrogates will be pushed back as well)
+ * and [decodeUTF8] returns 0
+ * @return number of bytes required to decode incomplete utf8 character or 0 if all bytes were processed
+ * or -1 if consumer rejected loop
+ */
+internal inline fun ByteBuffer.decodeUTF8(consumer: (Char) -> Boolean): Int {
+    var byteCount = 0
+    var value = 0
+    var lastByteCount = 0
+
+    while (hasRemaining()) {
+        val v = get().toInt() and 0xff
+        when {
+            v and 0x80 == 0 -> {
+                if (byteCount != 0) throw MalformedInputException(0)
+                if (!consumer(v.toChar())) {
+                    position(position() - 1)
+                    return -1
+                }
+            }
+            byteCount == 0 -> {
+                // first unicode byte
+
+                var mask = 0x80
+                value = v
+
+                for (i in 1..6) { // TODO do we support 6 bytes unicode?
+                    if (value and mask != 0) {
+                        value = value and mask.inv()
+                        mask = mask shr 1
+                        byteCount++
+                    } else {
+                        break
+                    }
+                }
+
+                lastByteCount = byteCount
+                byteCount--
+
+                if (byteCount > remaining()) {
+                    position(position() - 1) // return one byte back
+                    return lastByteCount
+                }
+            }
+            else -> {
+                // trailing unicode byte
+                value = (value shl 6) or (v and 0x7f)
+                byteCount--
+
+                if (byteCount == 0) {
+                    if (isBmpCodePoint(value)) {
+                        if (!consumer(value.toChar())) {
+                            position(position() - lastByteCount)
+                            return -1
+                        }
+                    } else if (!isValidCodePoint(value)) {
+                        throw IllegalArgumentException("Malformed code-point ${Integer.toHexString(value)} found")
+                    } else {
+                        if (!consumer(highSurrogate(value).toChar()) ||
+                            !consumer(lowSurrogate(value).toChar())) {
+                            position(position() - lastByteCount)
+                            return -1
+                        }
+                    }
+
+                    value = 0
+                }
+            }
+        }
+    }
+
+    return 0
+}
+
+private const val MaxCodePoint = 0X10ffff
+private const val MinLowSurrogate = 0xdc00
+private const val MinHighSurrogate = 0xd800
+private const val MinSupplementary = 0x10000
+private const val HighSurrogateMagic = MinHighSurrogate - (MinSupplementary ushr 10)
+
+private fun isBmpCodePoint(cp: Int) = cp ushr 16 == 0
+private fun isValidCodePoint(codePoint: Int) = codePoint <= MaxCodePoint
+private fun lowSurrogate(cp: Int) = (cp and 0x3ff) + MinLowSurrogate
+private fun highSurrogate(cp: Int) = (cp ushr 10) + HighSurrogateMagic
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/Utils.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/Utils.kt
new file mode 100644
index 0000000..a5be826
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/internal/Utils.kt
@@ -0,0 +1,26 @@
+package kotlinx.coroutines.experimental.io.internal
+
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
+import java.util.concurrent.atomic.AtomicLongFieldUpdater
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
+import kotlin.reflect.KProperty1
+
+internal fun ByteBuffer.isEmpty() = !hasRemaining()
+
+internal inline fun <reified Owner : Any> longUpdater(p: KProperty1<Owner, Long>): AtomicLongFieldUpdater<Owner> {
+    return AtomicLongFieldUpdater.newUpdater(Owner::class.java, p.name)
+}
+
+internal inline fun <reified Owner : Any> intUpdater(p: KProperty1<Owner, Int>): AtomicIntegerFieldUpdater<Owner> {
+    return AtomicIntegerFieldUpdater.newUpdater(Owner::class.java, p.name)
+}
+
+internal inline fun <reified Owner : Any, reified T> updater(p: KProperty1<Owner, T>): AtomicReferenceFieldUpdater<Owner, T> {
+    return AtomicReferenceFieldUpdater.newUpdater(Owner::class.java, T::class.java, p.name)
+}
+
+internal fun getIOIntProperty(name: String, default: Int): Int =
+    try { System.getProperty("kotlinx.coroutines.io.$name") }
+    catch (e: SecurityException) { null }
+        ?.toIntOrNull() ?: default
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacket.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacket.kt
new file mode 100644
index 0000000..62ce16b
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacket.kt
@@ -0,0 +1,38 @@
+package kotlinx.coroutines.experimental.io.packet
+
+import java.io.*
+import java.nio.*
+import kotlin.experimental.*
+
+interface ByteReadPacket {
+    val remaining: Int
+
+    fun readLazy(dst: ByteArray, offset: Int, length: Int): Int
+    fun readLazy(dst: ByteArray) = readLazy(dst, 0, dst.size)
+    fun readLazy(dst: ByteBuffer): Int
+
+    fun readFully(dst: ByteArray, offset: Int, length: Int)
+    fun readFully(dst: ByteArray) = readFully(dst, 0, dst.size)
+    fun readFully(dst: ByteBuffer): Int
+
+    fun readLong(): Long
+    fun readInt(): Int
+    fun readShort(): Short
+    fun readByte(): Byte
+
+    fun readUInt(): Long = readInt().toLong() and 0xffffffff
+    fun readUShort(): Int = readShort().toInt() and 0xffff
+    fun readUByte(): Short = readByte().toShort() and 0xff
+
+    fun readDouble(): Double
+    fun readFloat(): Float
+
+    fun skip(n: Int): Int
+    fun skipExact(n: Int)
+
+    fun release()
+
+    fun readUTF8LineTo(out: Appendable, limit: Int = Int.MAX_VALUE): Boolean
+
+    fun inputStream(): InputStream
+}
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketEmpty.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketEmpty.kt
new file mode 100644
index 0000000..c278890
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketEmpty.kt
@@ -0,0 +1,80 @@
+package kotlinx.coroutines.experimental.io.packet
+
+import java.io.*
+import java.nio.*
+
+object ByteReadPacketEmpty : ByteReadPacket {
+    override val remaining: Int
+        get() = 0
+
+    override fun readLazy(dst: ByteArray, offset: Int, length: Int) = -1
+    override fun readLazy(dst: ByteBuffer): Int = -1
+
+    override fun readFully(dst: ByteArray, offset: Int, length: Int) {
+        if (length > 0) throw EOFException("Couldn't read $length bytes from empty packet")
+    }
+
+    override fun readFully(dst: ByteBuffer): Int {
+        if (dst.hasRemaining()) throw EOFException("Couldn't read ${dst.remaining()} bytes from empty packet")
+        return 0
+    }
+
+    override fun readLong(): Long {
+        throw EOFException("Couldn't read long from empty packet")
+    }
+
+    override fun readInt(): Int {
+        throw EOFException("Couldn't read int from empty packet")
+    }
+
+    override fun readShort(): Short {
+        throw EOFException("Couldn't read short from empty packet")
+    }
+
+    override fun readByte(): Byte {
+        throw EOFException("Couldn't read byte from empty packet")
+    }
+
+    override fun readDouble(): Double {
+        throw EOFException("Couldn't read double from empty packet")
+    }
+
+    override fun readFloat(): Float {
+        throw EOFException("Couldn't read float from empty packet")
+    }
+
+    override fun skip(n: Int): Int {
+        return 0
+    }
+
+    override fun skipExact(n: Int) {
+        if (n != 0) throw EOFException("Couldn't skip $n bytes in empty packet")
+    }
+
+    override fun release() {
+    }
+
+    override fun readUTF8LineTo(out: Appendable, limit: Int) = false
+
+    override fun inputStream() = EmptyInputStream
+
+    private val EmptyInputStream = object : InputStream() {
+        override fun available() = 0
+
+        override fun read(): Int = -1
+        override fun read(b: ByteArray?, off: Int, len: Int): Int = -1
+        override fun read(b: ByteArray?) = -1
+
+        override fun skip(n: Long) = 0L
+
+        override fun markSupported() = true
+        override fun mark(readlimit: Int) {
+        }
+
+        override fun reset() {
+        }
+
+        override fun close() {
+        }
+    }
+}
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketImpl.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketImpl.kt
new file mode 100644
index 0000000..dd9fad2
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketImpl.kt
@@ -0,0 +1,240 @@
+@file:Suppress("UsePropertyAccessSyntax")
+
+package kotlinx.coroutines.experimental.io.packet
+
+import kotlinx.coroutines.experimental.io.internal.*
+import java.io.*
+import java.nio.*
+import java.nio.charset.*
+import java.util.*
+
+internal class ByteReadPacketImpl(private val packets: ArrayDeque<ByteBuffer>, internal val pool: ObjectPool<ByteBuffer>) : ByteReadPacket {
+    override val remaining: Int
+        get() = packets.sumBy { it.remaining() }
+
+    internal fun steal(): ByteBuffer = packets.pollFirst() ?: throw IllegalStateException("EOF")
+
+    override fun readLazy(dst: ByteArray, offset: Int, length: Int): Int {
+        var copied = 0
+
+        val rc = reading(0) { buffer ->
+            val size = minOf(buffer.remaining(), length - copied)
+            buffer.get(dst, offset + copied, size)
+            copied += size
+
+            copied < length
+        }
+
+        return if (rc) copied else -1
+    }
+
+    override fun readLazy(dst: ByteBuffer): Int {
+        var copied = 0
+
+        val rc = reading(0) { buffer ->
+            if (dst.remaining() >= buffer.remaining()) {
+                copied += buffer.remaining()
+                dst.put(buffer)
+            } else {
+                while (dst.hasRemaining() && buffer.hasRemaining()) {
+                    dst.put(buffer.get())
+                    copied++
+                }
+            }
+
+            dst.hasRemaining()
+        }
+
+        return if (rc) copied else -1
+    }
+
+    override fun readFully(dst: ByteArray, offset: Int, length: Int) {
+        val rc = readLazy(dst, offset, length)
+        if (rc < length) throw EOFException("Not enough bytes in the packet")
+    }
+
+    override fun readFully(dst: ByteBuffer): Int {
+        val rc = readLazy(dst)
+        if (dst.hasRemaining()) throw EOFException("Not enough bytes in the packet")
+        return rc
+    }
+
+    override fun readLong(): Long {
+        var v = 0L
+        val rc = reading(8) { v = it.getLong(); false }
+        if (!rc) throw EOFException("Couldn't read long from empty packet")
+        return v
+    }
+
+    override fun readInt(): Int {
+        var v = 0
+        val rc = reading(4) { v = it.getInt(); false }
+        if (!rc) throw EOFException("Couldn't read int from empty packet")
+        return v
+    }
+
+    override fun readShort(): Short {
+        var v: Short = 0
+        val rc = reading(2) { v = it.getShort(); false }
+        if (!rc) throw EOFException("Couldn't read short from empty packet")
+        return v
+    }
+
+    override fun readByte(): Byte {
+        var v: Byte = 0
+        val rc = reading(1) { v = it.get(); false }
+        if (!rc) throw EOFException("Couldn't read byte from empty packet")
+        return v
+    }
+
+    override fun readDouble(): Double {
+        var v = 0.0
+        val rc = reading(8) { v = it.getDouble(); false }
+        if (!rc) throw EOFException("Couldn't read double from empty packet")
+        return v
+    }
+
+    override fun readFloat(): Float {
+        var v = 0.0f
+        val rc = reading(4) { v = it.getFloat(); false }
+        if (!rc) throw EOFException("Couldn't read float from empty packet")
+        return v
+    }
+
+    override fun readUTF8LineTo(out: Appendable, limit: Int): Boolean {
+        var decoded = 0
+        var size = 1
+        var cr = false
+        var end = false
+
+        val rc = reading(size) { bb ->
+            size = bb.decodeUTF8 { ch ->
+                when (ch) {
+                    '\r' -> {
+                        if (cr) {
+                            end = true
+                            return@decodeUTF8 false
+                        }
+                        cr = true
+                        true
+                    }
+                    '\n' -> {
+                        return true
+                    }
+                    else -> {
+                        if (cr) {
+                            end = true
+                            return@decodeUTF8 false
+                        }
+
+                        if (decoded == limit) throw BufferOverflowException()
+                        decoded++
+                        out.append(ch)
+                        true
+                    }
+                }
+            }
+
+            !end && size == 0
+        }
+
+        if (!rc && size != 0) throw MalformedInputException(0)
+
+        return rc
+    }
+
+    override fun skip(n: Int): Int {
+        var skipped = 0
+
+        reading(0) {
+            val m = minOf(n - skipped, it.remaining())
+            it.position(it.position() + m)
+            skipped += m
+
+            skipped < n
+        }
+
+        return skipped
+    }
+
+    override fun skipExact(n: Int) {
+        if (skip(n) != n) throw EOFException("Unable to skip $n bytes due to end of packet")
+    }
+
+    override fun inputStream(): InputStream {
+        return object : InputStream() {
+            override fun read(): Int {
+                var v: Byte = 0
+                val rc = reading(1) { v = it.get(); true }
+                return if (rc) v.toInt() and 0xff else -1
+            }
+
+            override fun read(b: ByteArray, off: Int, len: Int) = readLazy(b, off, len)
+            override fun skip(n: Long): Long {
+                if (n > Int.MAX_VALUE) return this@ByteReadPacketImpl.skip(Int.MAX_VALUE).toLong()
+                return this@ByteReadPacketImpl.skip(n.toInt()).toLong()
+            }
+
+            override fun available() = remaining
+        }
+    }
+
+    override fun release() {
+        while (packets.isNotEmpty()) {
+            recycle(packets.remove())
+        }
+    }
+
+    private inline fun reading(size: Int, block: (ByteBuffer) -> Boolean): Boolean {
+        if (packets.isEmpty()) return false
+
+        var visited = false
+        var buffer = packets.peekFirst()
+        var stop = false
+
+        while (!stop) {
+            if (buffer.hasRemaining()) {
+                if (buffer.remaining() < size) {
+                    if (!tryStealBytesFromNextBuffer(size, buffer)) return false
+                }
+
+                visited = true
+                stop = !block(buffer)
+            }
+
+            if (!buffer.hasRemaining()) {
+                packets.removeFirst()
+                recycle(buffer)
+
+                if (packets.isEmpty()) break
+                buffer = packets.peekFirst()
+            }
+        }
+
+        return visited
+    }
+
+    private fun tryStealBytesFromNextBuffer(size: Int, buffer: ByteBuffer): Boolean {
+        if (packets.size == 1) {
+            return false
+        }
+
+        packets.removeFirst()
+
+        val extraBytes = size - buffer.remaining()
+        val next = packets.peekFirst()
+
+        buffer.compact()
+        repeat(extraBytes) {
+            buffer.put(next.get())
+        }
+        buffer.flip()
+
+        packets.addFirst(buffer)
+        return true
+    }
+
+    private fun recycle(buffer: ByteBuffer) {
+        pool.recycle(buffer)
+    }
+}
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketSingle.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketSingle.kt
new file mode 100644
index 0000000..5b4c28b
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketSingle.kt
@@ -0,0 +1,200 @@
+package kotlinx.coroutines.experimental.io.packet
+
+import kotlinx.coroutines.experimental.io.internal.*
+import java.io.*
+import java.nio.*
+import java.nio.charset.*
+
+internal class ByteReadPacketSingle(private var buffer: ByteBuffer?, internal val pool: ObjectPool<ByteBuffer>) : ByteReadPacket {
+    override val remaining: Int
+        get() = buffer?.remaining() ?: 0
+
+    internal fun steal(): ByteBuffer = buffer?.also { buffer = null } ?: throw IllegalStateException("EOF")
+
+    override fun readLazy(dst: ByteArray, offset: Int, length: Int): Int {
+        var copied = 0
+
+        val rc = reading { buffer ->
+            val size = minOf(buffer.remaining(), length - copied)
+            buffer.get(dst, offset, size)
+            copied += size
+
+            if (copied == length) return copied
+        }
+
+        return if (rc) copied else -1
+    }
+
+    override fun readLazy(dst: ByteBuffer): Int {
+        var copied = 0
+
+        val rc = reading { buffer ->
+            if (dst.remaining() >= buffer.remaining()) {
+                copied += buffer.remaining()
+                dst.put(buffer)
+            } else {
+                while (dst.hasRemaining() && buffer.hasRemaining()) {
+                    dst.put(buffer.get())
+                    copied++
+                }
+            }
+
+            if (!dst.hasRemaining()) return copied
+        }
+
+        return if (rc) copied else -1
+    }
+
+    override fun readFully(dst: ByteArray, offset: Int, length: Int) {
+        val rc = readLazy(dst, offset, length)
+        if (rc < length) throw EOFException("Not enough bytes in the packet")
+    }
+
+    override fun readFully(dst: ByteBuffer): Int {
+        val rc = readLazy(dst)
+        if (dst.hasRemaining()) throw EOFException("Not enough bytes in the packet")
+        return rc
+    }
+
+    override fun readLong(): Long {
+        var v = 0L
+        val rc = reading { v = it.getLong() }
+        if (!rc) throw EOFException("Couldn't read long from empty packet")
+        return v
+    }
+
+    override fun readInt(): Int {
+        var v = 0
+        val rc = reading { v = it.getInt() }
+        if (!rc) throw EOFException("Couldn't read int from empty packet")
+        return v
+    }
+
+    override fun readShort(): Short {
+        var v: Short = 0
+        val rc = reading { v = it.getShort() }
+        if (!rc) throw EOFException("Couldn't read short from empty packet")
+        return v
+    }
+
+    override fun readByte(): Byte {
+        var v: Byte = 0
+        val rc = reading { v = it.get() }
+        if (!rc) throw EOFException("Couldn't read byte from empty packet")
+        return v
+    }
+
+    override fun readDouble(): Double {
+        var v = 0.0
+        val rc = reading { v = it.getDouble() }
+        if (!rc) throw EOFException("Couldn't read double from empty packet")
+        return v
+    }
+
+    override fun readFloat(): Float {
+        var v = 0.0f
+        val rc = reading { v = it.getFloat() }
+        if (!rc) throw EOFException("Couldn't read float from empty packet")
+        return v
+    }
+
+    override fun readUTF8LineTo(out: Appendable, limit: Int): Boolean {
+        var decoded = 0
+        var cr = false
+
+        return reading { bb ->
+            val rc = bb.decodeUTF8 { ch ->
+                when (ch) {
+                    '\r' -> {
+                        if (cr) {
+                            false
+                        } else {
+                            cr = true
+                            true
+                        }
+                    }
+                    '\n' ->  false
+                    else -> {
+                        if (cr) {
+                            false
+                        } else {
+                            if (decoded == limit) throw BufferOverflowException()
+                            decoded++
+                            out.append(ch)
+                            true
+                        }
+                    }
+                }
+            }
+
+            if (rc == -1) {
+                val v = bb.get()
+                if (v != 0x0a.toByte() && v != 0x0d.toByte()) {
+                    bb.position(bb.position() - 1)
+                }
+            } else if (rc > 0) throw MalformedInputException(0)
+        }
+    }
+
+    override fun skip(n: Int): Int {
+        var skipped = 0
+
+        reading {
+            val m = minOf(n - skipped, it.remaining())
+            it.position(it.position() + m)
+            skipped += m
+        }
+
+        return skipped
+    }
+
+    override fun skipExact(n: Int) {
+        if (skip(n) != n) throw EOFException("Unable to skip $n bytes due to end of packet")
+    }
+
+    override fun inputStream(): InputStream {
+        return object : InputStream() {
+            override fun read(): Int {
+                var v: Byte = 0
+                val rc = reading { v = it.get() }
+                return if (rc) v.toInt() and 0xff else -1
+            }
+
+            override fun read(b: ByteArray, off: Int, len: Int) = readLazy(b, off, len)
+            override fun skip(n: Long): Long {
+                if (n > Int.MAX_VALUE) return this@ByteReadPacketSingle.skip(Int.MAX_VALUE).toLong()
+                return this@ByteReadPacketSingle.skip(n.toInt()).toLong()
+            }
+
+            override fun available() = remaining
+        }
+    }
+
+    override fun release() {
+        recycle(buffer ?: return)
+        buffer = null
+    }
+
+    private inline fun reading(block: (ByteBuffer) -> Unit): Boolean {
+        val buffer = buffer ?: return false
+
+        if (!buffer.hasRemaining()) {
+            this.buffer = null
+            recycle(buffer)
+            return false
+        }
+
+        block(buffer)
+
+        if (!buffer.hasRemaining()) {
+            this.buffer = null
+            recycle(buffer)
+        }
+
+        return true
+    }
+
+    private fun recycle(buffer: ByteBuffer) {
+        pool.recycle(buffer)
+    }
+}
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteWritePacket.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteWritePacket.kt
new file mode 100644
index 0000000..564be91
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteWritePacket.kt
@@ -0,0 +1,31 @@
+package kotlinx.coroutines.experimental.io.packet
+
+import java.io.*
+import java.nio.*
+
+interface ByteWritePacket : Appendable {
+    fun writeFully(src: ByteArray, offset: Int, length: Int)
+    fun writeFully(src: ByteArray) = writeFully(src, 0, src.size)
+    fun writeFully(src: ByteBuffer)
+
+    fun writeLong(l: Long)
+    fun writeInt(i: Int)
+    fun writeShort(s: Short)
+    fun writeByte(b: Byte)
+    fun writeDouble(d: Double)
+    fun writeFloat(f: Float)
+
+    fun writeStringUtf8(s: String)
+    fun writeStringUtf8(cb: CharBuffer)
+    fun writeStringUtf8(cs: CharSequence)
+
+    fun release()
+    fun build(): ByteReadPacket
+
+    fun outputStream(): OutputStream
+
+    override fun append(csq: CharSequence): ByteWritePacket {
+        append(csq, 0, csq.length)
+        return this
+    }
+}
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteWritePacketImpl.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteWritePacketImpl.kt
new file mode 100644
index 0000000..562c27a
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteWritePacketImpl.kt
@@ -0,0 +1,208 @@
+package kotlinx.coroutines.experimental.io.packet
+
+import kotlinx.coroutines.experimental.io.internal.*
+import java.io.*
+import java.nio.*
+import java.util.*
+
+internal class ByteWritePacketImpl(private val pool: ObjectPool<ByteBuffer>) : ByteWritePacket {
+    private var buffers: Any? = null
+
+    override fun writeFully(src: ByteArray, offset: Int, length: Int) {
+        var copied = 0
+
+        while (copied < length) {
+            write(1) { buffer ->
+                val size = minOf(buffer.remaining(), length - copied)
+                buffer.put(src, offset + copied, size)
+                copied += size
+            }
+        }
+    }
+
+    override fun writeFully(src: ByteBuffer) {
+        while (src.hasRemaining()) {
+            write(1) { buffer ->
+                if (buffer.remaining() >= src.remaining()) {
+                    buffer.put(src)
+                } else {
+                    while (buffer.hasRemaining() && src.hasRemaining()) {
+                        buffer.put(src.get())
+                    }
+                }
+            }
+        }
+    }
+
+    override fun writeLong(l: Long) {
+        write(8) { it.putLong(l) }
+    }
+
+    override fun writeInt(i: Int) {
+        write(4) { it.putInt(i) }
+    }
+
+    override fun writeShort(s: Short) {
+        write(2) { it.putShort(s) }
+    }
+
+    override fun writeByte(b: Byte) {
+        write(1) { it.put(b) }
+    }
+
+    override fun writeDouble(d: Double) {
+        write(8) { it.putDouble(d) }
+    }
+
+    override fun writeFloat(f: Float) {
+        write(4) { it.putFloat(f) }
+    }
+
+    override fun append(c: Char): ByteWritePacket {
+        write(3) {
+            it.putUtf8Char(c.toInt() and 0xffff)
+        }
+        return this
+    }
+
+    override fun append(csq: CharSequence, start: Int, end: Int): ByteWritePacket {
+        val length = end - start
+        var w = 0
+        var requiredSize = 1
+
+        while (w < length) {
+            write(requiredSize) { bb ->
+                while (w < length) {
+                    val ch = csq[start + w]
+                    val v = ch.toInt() and 0xffff
+
+                    requiredSize = when {
+                        v in 1..0x7f -> 1
+                        v > 0x7ff -> 3
+                        else -> 2
+                    }
+
+                    if (bb.remaining() >= requiredSize) {
+                        bb.putUtf8Char(v)
+                        w++
+                    } else {
+                        break
+                    }
+                }
+            }
+        }
+
+        return this
+    }
+
+    override fun writeStringUtf8(s: String) {
+        append(s, 0, s.length)
+    }
+
+    override fun writeStringUtf8(cs: CharSequence) {
+        append(cs, 0, cs.length)
+    }
+
+    override fun writeStringUtf8(cb: CharBuffer) {
+        append(cb, 0, cb.remaining())
+    }
+
+    @Suppress("NOTHING_TO_INLINE")
+    private inline fun ByteBuffer.putUtf8Char(v: Int) {
+        when {
+            v in 1..0x7f -> put(v.toByte())
+            v > 0x7ff -> {
+                put((0xe0 or ((v shr 12) and 0x0f)).toByte())
+                put((0x80 or ((v shr  6) and 0x3f)).toByte())
+                put((0x80 or ( v         and 0x3f)).toByte())
+            }
+            else -> {
+                put((0xc0 or ((v shr  6) and 0x1f)).toByte())
+                put((0x80 or ( v         and 0x3f)).toByte())
+            }
+        }
+    }
+
+    override fun outputStream(): OutputStream {
+        return object : OutputStream() {
+            override fun write(b: Int) {
+                writeByte(b.toByte())
+            }
+
+            override fun write(b: ByteArray, off: Int, len: Int) {
+                writeFully(b, off, len)
+            }
+        }
+    }
+
+    override fun build(): ByteReadPacket {
+        val bs = buffers ?: return ByteReadPacketEmpty
+        buffers = null
+
+        return if (bs is ArrayDeque<*>) {
+            @Suppress("UNCHECKED_CAST")
+            when {
+                bs.isEmpty() -> ByteReadPacketEmpty
+                bs.size == 1 -> ByteReadPacketSingle((bs.first as ByteBuffer).also { it.flip() }, pool)
+                else -> ByteReadPacketImpl((bs as ArrayDeque<ByteBuffer>).also {
+                    for (b in bs) {
+                        b.flip()
+                    }
+                }, pool)
+            }
+        } else {
+            ByteReadPacketSingle((bs as ByteBuffer).also { it.flip() }, pool)
+        }
+    }
+
+    override fun release() {
+        val bs = buffers ?: return
+        buffers = null
+
+        if (bs is ArrayDeque<*>) {
+            for (o in bs) {
+                recycle(o as ByteBuffer)
+            }
+        } else {
+            recycle(bs as ByteBuffer)
+        }
+    }
+
+    private inline fun write(size: Int, block: (ByteBuffer) -> Unit) {
+        val buffer = last()?.takeIf { it.remaining() >= size }
+
+        if (buffer == null) {
+            val new = pool.borrow()
+            last(new)
+            new.clear()
+            block(new)
+        } else {
+            block(buffer)
+        }
+    }
+
+    private fun last(): ByteBuffer? = buffers?.let { b ->
+        @Suppress("UNCHECKED_CAST")
+        when (b) {
+            is ByteBuffer -> b
+            is ArrayDeque<*> -> (b as ArrayDeque<ByteBuffer>).takeIf { it.isNotEmpty() }?.peekLast()
+            else -> null
+        }
+    }
+
+    private fun last(new: ByteBuffer) {
+        @Suppress("UNCHECKED_CAST")
+        if (buffers is ArrayDeque<*>) (buffers as ArrayDeque<ByteBuffer>).addLast(new)
+        else if (buffers == null) buffers = new
+        else {
+            val dq = ArrayDeque<ByteBuffer>()
+            dq.addFirst(buffers as ByteBuffer)
+            dq.addLast(new)
+            buffers = dq
+        }
+    }
+
+    private fun recycle(buffer: ByteBuffer) {
+        pool.recycle(buffer)
+    }
+}
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/Packets.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/Packets.kt
new file mode 100644
index 0000000..26ebba4
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/Packets.kt
@@ -0,0 +1,22 @@
+package kotlinx.coroutines.experimental.io.packet
+
+import kotlinx.coroutines.experimental.io.internal.ObjectPool
+import kotlinx.coroutines.experimental.io.internal.ObjectPoolImpl
+import kotlinx.coroutines.experimental.io.internal.getIOIntProperty
+import java.nio.ByteBuffer
+
+private val PACKET_BUFFER_SIZE = getIOIntProperty("PacketBufferSize", 4096)
+private val PACKET_BUFFER_POOL_SIZE = getIOIntProperty("PacketBufferPoolSize", 128)
+
+private val PacketBufferPool: ObjectPool<ByteBuffer> =
+    object : ObjectPoolImpl<ByteBuffer>(PACKET_BUFFER_POOL_SIZE) {
+        override fun produceInstance(): ByteBuffer = ByteBuffer.allocateDirect(PACKET_BUFFER_SIZE)
+    }
+
+fun buildPacket(block: ByteWritePacket.() -> Unit): ByteReadPacket =
+        ByteWritePacketImpl(PacketBufferPool).apply(block).build()
+
+fun ByteReadPacket.readUTF8Line(estimate: Int = 16, limit: Int = Int.MAX_VALUE): String? {
+    val sb = StringBuilder(estimate)
+    return if (readUTF8LineTo(sb, limit)) sb.toString() else null
+}
diff --git a/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelScenarioTest.kt b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelScenarioTest.kt
new file mode 100644
index 0000000..6c7021e
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelScenarioTest.kt
@@ -0,0 +1,219 @@
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.*
+import org.junit.*
+import java.io.*
+import kotlin.test.*
+
+class ByteBufferChannelScenarioTest : TestBase() {
+    private val ch = ByteBufferChannel(true)
+
+    @Test
+    fun testReadBeforeAvailable() {
+        expect(1)
+
+        runBlocking {
+            launch(coroutineContext) {
+                expect(3)
+
+                val bb = ByteBuffer.allocate(10)
+                val rc = ch.readAvailable(bb) // should suspend
+
+                expect(5)
+                assertEquals(4, rc)
+
+                expect(6)
+            }
+
+            expect(2)
+            yield()
+
+            expect(4)
+            ch.writeInt(0xff) // should resume
+
+            yield()
+
+            finish(7)
+        }
+    }
+
+    @Test
+    fun testReadBeforeAvailable2() {
+        expect(1)
+
+        runBlocking {
+            launch(coroutineContext) {
+                expect(3)
+
+                val bb = ByteBuffer.allocate(4)
+                ch.readFully(bb) // should suspend
+
+                expect(5)
+
+                bb.flip()
+                assertEquals(4, bb.remaining())
+
+                expect(6)
+            }
+
+            expect(2)
+            yield()
+
+            expect(4)
+            ch.writeInt(0xff) // should resume
+
+            yield()
+
+            finish(7)
+        }
+    }
+
+    @Test
+    fun testReadAfterAvailable() {
+        expect(1)
+
+        runBlocking {
+            ch.writeInt(0xff) // should resume
+
+            launch(coroutineContext) {
+                expect(3)
+
+                val bb = ByteBuffer.allocate(10)
+                val rc = ch.readAvailable(bb) // should NOT suspend
+
+                expect(4)
+                assertEquals(4, rc)
+
+                expect(5)
+            }
+
+            expect(2)
+            yield()
+
+            finish(6)
+        }
+    }
+
+    @Test
+    fun testReadAfterAvailable2() {
+        expect(1)
+
+        runBlocking {
+            ch.writeInt(0xff) // should resume
+
+            launch(coroutineContext) {
+                expect(3)
+
+                val bb = ByteBuffer.allocate(4)
+                ch.readFully(bb) // should NOT suspend
+
+                expect(4)
+                bb.flip()
+                assertEquals(4, bb.remaining())
+
+                expect(5)
+            }
+
+            expect(2)
+            yield()
+
+            finish(6)
+        }
+    }
+
+    @Test
+    fun testReadToEmpty() {
+        runBlocking {
+            expect(1)
+
+            val rc = ch.readAvailable(ByteBuffer.allocate(0))
+
+            expect(2)
+
+            assertEquals(0, rc)
+
+            finish(3)
+        }
+    }
+
+    @Test
+    fun testReadToEmptyFromFailedChannel() {
+        runBlocking {
+            expect(1)
+
+            ch.close(IOException())
+
+            try {
+                ch.readAvailable(ByteBuffer.allocate(0))
+                fail("Should throw exception")
+            } catch (expected: IOException) {
+            }
+
+            finish(2)
+        }
+    }
+
+    @Test
+    fun testReadToEmptyFromClosedChannel() {
+        runBlocking {
+            expect(1)
+
+            ch.close()
+
+            val rc = ch.readAvailable(ByteBuffer.allocate(0))
+
+            expect(2)
+
+            assertEquals(-1, rc)
+
+            finish(3)
+        }
+    }
+
+    @Test
+    fun testReadFullyToEmptyFromClosedChannel() {
+        runBlocking {
+            expect(1)
+
+            ch.close()
+
+            ch.readFully(ByteBuffer.allocate(0))
+
+            finish(2)
+        }
+    }
+
+    @Test
+    fun testReadFullyFromClosedChannel() {
+        runBlocking {
+            expect(1)
+
+            ch.close()
+            try {
+                ch.readFully(ByteBuffer.allocate(1))
+                fail("Should throw exception")
+            } catch (expected: ClosedReceiveChannelException) {
+            }
+
+            finish(2)
+        }
+    }
+
+    @Test
+    fun testReadFullyToEmptyFromFailedChannel() {
+        runBlocking {
+            expect(1)
+
+            ch.close(IOException())
+
+            try {
+                ch.readFully(ByteBuffer.allocate(0))
+                fail("Should throw exception")
+            } catch (expected: IOException) {
+            }
+
+            finish(2)
+        }
+    }
+}
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelTest.kt b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelTest.kt
new file mode 100644
index 0000000..00ab358
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelTest.kt
@@ -0,0 +1,604 @@
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.*
+import kotlinx.coroutines.experimental.io.internal.*
+import kotlinx.coroutines.experimental.io.packet.*
+import org.junit.*
+import org.junit.rules.*
+import java.nio.*
+import java.nio.ByteBuffer
+import java.util.*
+import java.util.concurrent.*
+import kotlin.test.*
+
+class ByteBufferChannelTest {
+    @get:Rule
+    val timeout = Timeout(10, TimeUnit.SECONDS)
+
+    @get:Rule
+    val failures = ErrorCollector()
+
+    private val Size = BUFFER_SIZE - RESERVED_SIZE
+    private val ch = ByteBufferChannel(autoFlush = false, pool = BufferObjectNoPool)
+
+    @Test
+    fun testBoolean() {
+        runBlocking {
+            ch.writeBoolean(true)
+            ch.flush()
+            assertEquals(true, ch.readBoolean())
+
+            ch.writeBoolean(false)
+            ch.flush()
+            assertEquals(false, ch.readBoolean())
+        }
+    }
+
+    @Test
+    fun testByte() {
+        runBlocking {
+            assertEquals(0, ch.availableForRead)
+            ch.writeByte(-1)
+            ch.flush()
+            assertEquals(1, ch.availableForRead)
+            assertEquals(-1, ch.readByte())
+            assertEquals(0, ch.availableForRead)
+        }
+    }
+
+    @Test
+    fun testShortB() {
+        runBlocking {
+            ch.readByteOrder = ByteOrder.BIG_ENDIAN
+            ch.writeByteOrder = ByteOrder.BIG_ENDIAN
+
+            assertEquals(0, ch.availableForRead)
+            ch.writeShort(-1)
+            assertEquals(0, ch.availableForRead)
+            ch.flush()
+            assertEquals(2, ch.availableForRead)
+            assertEquals(-1, ch.readShort())
+            assertEquals(0, ch.availableForRead)
+        }
+    }
+
+    @Test
+    fun testShortL() {
+        runBlocking {
+            ch.readByteOrder = ByteOrder.LITTLE_ENDIAN
+            ch.writeByteOrder = ByteOrder.LITTLE_ENDIAN
+
+            assertEquals(0, ch.availableForRead)
+            ch.writeShort(-1)
+            assertEquals(0, ch.availableForRead)
+            ch.flush()
+            assertEquals(2, ch.availableForRead)
+            assertEquals(-1, ch.readShort())
+            assertEquals(0, ch.availableForRead)
+        }
+    }
+
+    @Test
+    fun testShortEdge() {
+        runBlocking {
+            ch.writeByte(1)
+
+            for (i in 2 until Size step 2) {
+                ch.writeShort(0x00ee)
+            }
+
+            ch.flush()
+
+            ch.readByte()
+            ch.writeShort(0x1234)
+
+            ch.flush()
+
+            while (ch.availableForRead > 2) {
+                ch.readShort()
+            }
+
+            assertEquals(0x1234, ch.readShort())
+        }
+    }
+
+    @Test
+    fun testIntB() {
+        runBlocking {
+            ch.readByteOrder = ByteOrder.BIG_ENDIAN
+            ch.writeByteOrder = ByteOrder.BIG_ENDIAN
+
+            assertEquals(0, ch.availableForRead)
+            ch.writeInt(-1)
+            ch.flush()
+            assertEquals(4, ch.availableForRead)
+            assertEquals(-1, ch.readInt())
+            assertEquals(0, ch.availableForRead)
+        }
+    }
+
+    @Test
+    fun testIntL() {
+        runBlocking {
+            ch.readByteOrder = ByteOrder.LITTLE_ENDIAN
+            ch.writeByteOrder = ByteOrder.LITTLE_ENDIAN
+
+            assertEquals(0, ch.availableForRead)
+            ch.writeInt(-1)
+            ch.flush()
+            assertEquals(4, ch.availableForRead)
+            assertEquals(-1, ch.readInt())
+            assertEquals(0, ch.availableForRead)
+        }
+    }
+
+    @Test
+    fun testIntEdge() {
+        runBlocking {
+            for (shift in 1..3) {
+                for (i in 1..shift) {
+                    ch.writeByte(1)
+                }
+
+                repeat(Size / 4 - 1) {
+                    ch.writeInt(0xeeeeeeeeL)
+                }
+
+                ch.flush()
+
+                for (i in 1..shift) {
+                    ch.readByte()
+                }
+
+                ch.writeInt(0x12345678)
+
+                ch.flush()
+
+                while (ch.availableForRead > 4) {
+                    ch.readInt()
+                }
+
+                assertEquals(0x12345678, ch.readInt())
+            }
+        }
+    }
+
+    @Test
+    fun testLongB() {
+        runBlocking {
+            ch.readByteOrder = ByteOrder.BIG_ENDIAN
+            ch.writeByteOrder = ByteOrder.BIG_ENDIAN
+
+            assertEquals(0, ch.availableForRead)
+            ch.writeLong(Long.MIN_VALUE)
+            ch.flush()
+            assertEquals(8, ch.availableForRead)
+            assertEquals(Long.MIN_VALUE, ch.readLong())
+            assertEquals(0, ch.availableForRead)
+        }
+    }
+
+    @Test
+    fun testLongL() {
+        runBlocking {
+            ch.readByteOrder = ByteOrder.LITTLE_ENDIAN
+            ch.writeByteOrder = ByteOrder.LITTLE_ENDIAN
+
+            assertEquals(0, ch.availableForRead)
+            ch.writeLong(Long.MIN_VALUE)
+            ch.flush()
+            assertEquals(8, ch.availableForRead)
+            assertEquals(Long.MIN_VALUE, ch.readLong())
+            assertEquals(0, ch.availableForRead)
+        }
+    }
+
+    @Test
+    fun testLongEdge() {
+        runBlocking {
+            for (shift in 1..7) {
+                for (i in 1..shift) {
+                    ch.writeByte(1)
+                }
+
+                repeat(Size / 8 - 1) {
+                    ch.writeLong(0x11112222eeeeeeeeL)
+                }
+
+                ch.flush()
+                for (i in 1..shift) {
+                    ch.readByte()
+                }
+
+                ch.writeLong(0x1234567812345678L)
+                ch.flush()
+
+                while (ch.availableForRead > 8) {
+                    ch.readLong()
+                }
+
+                assertEquals(0x1234567812345678L, ch.readLong())
+            }
+        }
+    }
+
+    @Test
+    fun testDoubleB() {
+        runBlocking {
+            ch.readByteOrder = ByteOrder.BIG_ENDIAN
+            ch.writeByteOrder = ByteOrder.BIG_ENDIAN
+
+            assertEquals(0, ch.availableForRead)
+            ch.writeDouble(1.05)
+            ch.flush()
+
+            assertEquals(8, ch.availableForRead)
+            assertEquals(1.05, ch.readDouble())
+            assertEquals(0, ch.availableForRead)
+        }
+    }
+
+    @Test
+    fun testDoubleL() {
+        runBlocking {
+            ch.readByteOrder = ByteOrder.LITTLE_ENDIAN
+            ch.writeByteOrder = ByteOrder.LITTLE_ENDIAN
+
+            assertEquals(0, ch.availableForRead)
+            ch.writeDouble(1.05)
+            ch.flush()
+
+            assertEquals(8, ch.availableForRead)
+            assertEquals(1.05, ch.readDouble())
+            assertEquals(0, ch.availableForRead)
+        }
+    }
+
+    @Test
+    fun testFloatB() {
+        runBlocking {
+            ch.readByteOrder = ByteOrder.BIG_ENDIAN
+            ch.writeByteOrder = ByteOrder.BIG_ENDIAN
+
+            assertEquals(0, ch.availableForRead)
+            ch.writeFloat(1.05f)
+            ch.flush()
+
+            assertEquals(4, ch.availableForRead)
+            assertEquals(1.05f, ch.readFloat())
+            assertEquals(0, ch.availableForRead)
+        }
+    }
+
+    @Test
+    fun testFloatL() {
+        runBlocking {
+            ch.readByteOrder = ByteOrder.LITTLE_ENDIAN
+            ch.writeByteOrder = ByteOrder.LITTLE_ENDIAN
+
+            assertEquals(0, ch.availableForRead)
+            ch.writeFloat(1.05f)
+            ch.flush()
+
+            assertEquals(4, ch.availableForRead)
+            assertEquals(1.05f, ch.readFloat())
+            assertEquals(0, ch.availableForRead)
+        }
+    }
+
+
+
+    @Test
+    fun testEndianMix() {
+        val byteOrders = listOf(ByteOrder.BIG_ENDIAN, ByteOrder.LITTLE_ENDIAN)
+        runBlocking {
+            for (writeOrder in byteOrders) {
+                ch.writeByteOrder = writeOrder
+
+                for (readOrder in byteOrders) {
+                    ch.readByteOrder = readOrder
+
+                    assertEquals(0, ch.availableForRead)
+                    ch.writeShort(0x001f)
+                    ch.flush()
+                    if (writeOrder == readOrder)
+                        assertEquals(0x001f, ch.readShort())
+                    else
+                        assertEquals(0x1f00, ch.readShort())
+
+                    assertEquals(0, ch.availableForRead)
+                    ch.writeShort(0x001f)
+                    ch.flush()
+                    if (writeOrder == readOrder)
+                        assertEquals(0x001f, ch.readShort())
+                    else
+                        assertEquals(0x1f00, ch.readShort())
+
+                    assertEquals(0, ch.availableForRead)
+                    ch.writeInt(0x1f)
+                    ch.flush()
+                    if (writeOrder == readOrder)
+                        assertEquals(0x0000001f, ch.readInt())
+                    else
+                        assertEquals(0x1f000000, ch.readInt())
+
+                    assertEquals(0, ch.availableForRead)
+                    ch.writeInt(0x1fL)
+                    ch.flush()
+                    if (writeOrder == readOrder)
+                        assertEquals(0x0000001f, ch.readInt())
+                    else
+                        assertEquals(0x1f000000, ch.readInt())
+
+                    assertEquals(0, ch.availableForRead)
+                    ch.writeLong(0x1f)
+                    ch.flush()
+                    if (writeOrder == readOrder)
+                        assertEquals(0x1f, ch.readLong())
+                    else
+                        assertEquals(0x1f00000000000000L, ch.readLong())
+                }
+            }
+        }
+    }
+
+    @Test
+    fun testClose() {
+        runBlocking {
+            ch.writeByte(1)
+            ch.writeByte(2)
+            ch.writeByte(3)
+
+            ch.flush()
+            assertEquals(1, ch.readByte())
+            ch.close()
+
+            assertEquals(2, ch.readByte())
+            assertEquals(3, ch.readByte())
+
+            try {
+                ch.readByte()
+                fail()
+            } catch (expected: ClosedReceiveChannelException) {
+            }
+        }
+    }
+
+    @Test
+    fun testReadAndWriteFully() {
+        runBlocking {
+            val bytes = byteArrayOf(1, 2, 3, 4, 5)
+            val dst = ByteArray(5)
+
+            ch.writeFully(bytes)
+            ch.flush()
+            assertEquals(5, ch.availableForRead)
+            ch.readFully(dst)
+            assertTrue { dst.contentEquals(bytes) }
+
+            ch.writeFully(bytes)
+            ch.flush()
+
+            val dst2 = ByteArray(4)
+            ch.readFully(dst2)
+
+            assertEquals(1, ch.availableForRead)
+            assertEquals(5, ch.readByte())
+
+            ch.close()
+
+            try {
+                ch.readFully(dst)
+                fail("")
+            } catch (expected: ClosedReceiveChannelException) {
+            }
+        }
+    }
+
+    @Test
+    fun testReadAndWriteFullyByteBuffer() {
+        runBlocking {
+            val bytes = byteArrayOf(1, 2, 3, 4, 5)
+            val dst = ByteArray(5)
+
+            ch.writeFully(ByteBuffer.wrap(bytes))
+            ch.flush()
+            assertEquals(5, ch.availableForRead)
+            ch.readFully(ByteBuffer.wrap(dst))
+            assertTrue { dst.contentEquals(bytes) }
+
+            ch.writeFully(ByteBuffer.wrap(bytes))
+            ch.flush()
+
+            val dst2 = ByteArray(4)
+            ch.readFully(ByteBuffer.wrap(dst2))
+
+            assertEquals(1, ch.availableForRead)
+            assertEquals(5, ch.readByte())
+
+            ch.close()
+
+            try {
+                ch.readFully(ByteBuffer.wrap(dst))
+                fail("")
+            } catch (expected: ClosedReceiveChannelException) {
+            }
+        }
+    }
+
+    @Test
+    fun testReadAndWritePartially() {
+        runBlocking {
+            val bytes = byteArrayOf(1, 2, 3, 4, 5)
+
+            assertEquals(5, ch.writeAvailable(bytes))
+            ch.flush()
+            assertEquals(5, ch.readAvailable(ByteArray(100)))
+
+            repeat(Size / bytes.size) {
+                assertNotEquals(0, ch.writeAvailable(bytes))
+                ch.flush()
+            }
+
+            ch.readAvailable(ByteArray(ch.availableForRead - 1))
+            assertEquals(1, ch.readAvailable(ByteArray(100)))
+
+            ch.close()
+        }
+    }
+
+    @Test
+    fun testReadAndWritePartiallyByteBuffer() {
+        runBlocking {
+            val bytes = byteArrayOf(1, 2, 3, 4, 5)
+
+            assertEquals(5, ch.writeAvailable(ByteBuffer.wrap(bytes)))
+            ch.flush()
+            assertEquals(5, ch.readAvailable(ByteBuffer.allocate(100)))
+
+            repeat(Size / bytes.size) {
+                assertNotEquals(0, ch.writeAvailable(ByteBuffer.wrap(bytes)))
+                ch.flush()
+            }
+
+            ch.readAvailable(ByteArray(ch.availableForRead - 1))
+            assertEquals(1, ch.readAvailable(ByteBuffer.allocate(100)))
+
+            ch.close()
+        }
+    }
+
+
+    @Test
+    fun testReadAndWriteBig() {
+        val count = 200
+        val bytes = ByteArray(65536)
+        Random().nextBytes(bytes)
+
+        launch(CommonPool + CoroutineName("writer")) {
+            for (i in 1..count) {
+                ch.writeFully(bytes)
+                ch.flush()
+            }
+        }.invokeOnCompletion { t ->
+            if (t != null) {
+                failures.addError(t)
+            }
+        }
+
+        runBlocking(CoroutineName("reader")) {
+            val dst = ByteArray(bytes.size)
+            for (i in 1..count) {
+                ch.readFully(dst)
+                assertTrue { dst.contentEquals(bytes) }
+                dst.fill(0)
+            }
+        }
+    }
+
+    @Test
+    fun testReadAndWriteBigByteBuffer() {
+        val count = 200
+        val bytes = ByteArray(65536)
+        Random().nextBytes(bytes)
+
+        launch(CommonPool + CoroutineName("writer")) {
+            for (i in 1..count) {
+                ch.writeFully(ByteBuffer.wrap(bytes))
+                ch.flush()
+            }
+        }.invokeOnCompletion { t ->
+            if (t != null) {
+                failures.addError(t)
+            }
+        }
+
+        runBlocking(CoroutineName("reader")) {
+            val dst = ByteArray(bytes.size)
+            for (i in 1..count) {
+                ch.readFully(ByteBuffer.wrap(dst))
+                assertTrue { dst.contentEquals(bytes) }
+                dst.fill(0)
+            }
+        }
+    }
+
+    @Test
+    fun testPacket() = runBlocking {
+        val packet = buildPacket {
+            writeInt(0xffee)
+            writeStringUtf8("Hello")
+        }
+
+        ch.writeInt(packet.remaining)
+        ch.writePacket(packet)
+
+        ch.flush()
+
+        val size = ch.readInt()
+        val readed = ch.readPacket(size)
+
+        assertEquals(0xffee, readed.readInt())
+        assertEquals("Hello", readed.readUTF8Line())
+    }
+
+    @Test
+    fun testBigPacket() = runBlocking {
+        launch(CommonPool + CoroutineName("writer")) {
+            val packet = buildPacket {
+                writeInt(0xffee)
+                writeStringUtf8(".".repeat(8192))
+            }
+
+            ch.writeInt(packet.remaining)
+            ch.writePacket(packet)
+
+            ch.flush()
+        }
+
+        val size = ch.readInt()
+        val readed = ch.readPacket(size)
+
+        assertEquals(0xffee, readed.readInt())
+        assertEquals(".".repeat(8192), readed.readUTF8Line())
+    }
+
+    @Test
+    fun testWriteString() = runBlocking {
+        ch.writeStringUtf8("abc")
+        ch.close()
+
+        assertEquals("abc", ch.readASCIILine())
+    }
+
+    @Test
+    fun testWriteCharSequence() = runBlocking {
+        ch.writeStringUtf8("abc" as CharSequence)
+        ch.close()
+
+        assertEquals("abc", ch.readASCIILine())
+    }
+
+    @Test
+    fun testWriteCharBuffer() = runBlocking {
+        val cb = CharBuffer.allocate(6)
+
+        for (i in 0 until cb.remaining()) {
+            cb.put(i, ' ')
+        }
+
+        cb.position(2)
+        cb.put(2, 'a')
+        cb.put(3, 'b')
+        cb.put(4, 'c')
+        cb.limit(5)
+
+        assertEquals("abc", cb.slice().toString())
+
+        ch.writeStringUtf8(cb)
+        ch.close()
+
+        assertEquals("abc", ch.readASCIILine())
+    }
+}
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/BytePacketBuildTest.kt b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/BytePacketBuildTest.kt
new file mode 100644
index 0000000..b7de477
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/BytePacketBuildTest.kt
@@ -0,0 +1,63 @@
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.io.packet.*
+import org.junit.*
+import kotlin.test.*
+
+class BytePacketBuildTest {
+    @Test
+    fun smokeSingleBufferTest() {
+        val p = buildPacket {
+            writeByte(0x12)
+            writeShort(0x1234)
+            writeInt(0x12345678)
+            writeDouble(1.23)
+            writeFloat(1.23f)
+            writeLong(0x123456789abcdef0)
+
+            writeStringUtf8("OK\n")
+            listOf(1, 2, 3).joinTo(this, separator = "|")
+        }
+
+        assertEquals(1 + 2 + 4 + 8 + 4 + 8 + 3 + 5, p.remaining)
+
+        assertEquals(0x12, p.readByte())
+        assertEquals(0x1234, p.readShort())
+        assertEquals(0x12345678, p.readInt())
+        assertEquals(1.23, p.readDouble())
+        assertEquals(1.23f, p.readFloat())
+        assertEquals(0x123456789abcdef0, p.readLong())
+
+        assertEquals("OK", p.readUTF8Line())
+        assertEquals("1|2|3", p.readUTF8Line())
+    }
+
+    @Test
+    fun smokeMultiBufferTest() {
+        val p = buildPacket {
+            writeFully(ByteArray(9999))
+            writeByte(0x12)
+            writeShort(0x1234)
+            writeInt(0x12345678)
+            writeDouble(1.23)
+            writeFloat(1.23f)
+            writeLong(0x123456789abcdef0)
+
+            writeStringUtf8("OK\n")
+            listOf(1, 2, 3).joinTo(this, separator = "|")
+        }
+
+        assertEquals(9999 + 1 + 2 + 4 + 8 + 4 + 8 + 3 + 5, p.remaining)
+
+        p.readFully(ByteArray(9999))
+        assertEquals(0x12, p.readByte())
+        assertEquals(0x1234, p.readShort())
+        assertEquals(0x12345678, p.readInt())
+        assertEquals(1.23, p.readDouble())
+        assertEquals(1.23f, p.readFloat())
+        assertEquals(0x123456789abcdef0, p.readLong())
+
+        assertEquals("OK", p.readUTF8Line())
+        assertEquals("1|2|3", p.readUTF8Line())
+    }
+}
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ContentByteBufferTest.kt b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ContentByteBufferTest.kt
new file mode 100644
index 0000000..2b9547e
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ContentByteBufferTest.kt
@@ -0,0 +1,53 @@
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.runBlocking
+import org.junit.Test
+import java.util.*
+import kotlin.test.assertEquals
+import kotlin.test.assertFalse
+import kotlin.test.assertTrue
+
+class ContentByteBufferTest {
+    @Test
+    fun testEmptyContent() = runBlocking {
+        val ch = ByteReadChannel(ByteArray(0))
+        assertEquals(0, ch.availableForRead)
+        assertEquals(-1, ch.readAvailable(ByteBuffer.allocate(100)))
+        assertTrue { ch.isClosedForRead }
+    }
+
+    @Test
+    fun testSingleByteContent() = runBlocking {
+        val ch = ByteReadChannel(byteArrayOf(1))
+        assertEquals(1, ch.availableForRead)
+        assertFalse { ch.isClosedForRead }
+        assertEquals(1, ch.readAvailable(ByteBuffer.allocate(100)))
+        assertEquals(0, ch.availableForRead)
+        assertTrue { ch.isClosedForRead }
+    }
+
+    @Test
+    fun testSingleByteContent2() = runBlocking {
+        val ch = ByteReadChannel(byteArrayOf(0x34))
+        assertEquals(1, ch.availableForRead)
+        assertFalse { ch.isClosedForRead }
+        assertEquals(0x34, ch.readByte())
+        assertEquals(0, ch.availableForRead)
+        assertTrue { ch.isClosedForRead }
+    }
+
+    @Test
+    fun testMultipleByteContent2() = runBlocking {
+        val arr = ByteArray(16)
+        Random().nextBytes(arr)
+        val ch = ByteReadChannel(arr)
+        assertEquals(16, ch.availableForRead)
+        assertFalse { ch.isClosedForRead }
+        ch.readByte()
+        ch.readShort()
+        ch.readInt()
+        ch.readLong()
+        ch.readByte()
+        assertTrue { ch.isClosedForRead }
+    }
+}
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/CopyAndCloseTest.kt b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/CopyAndCloseTest.kt
new file mode 100644
index 0000000..0ba8742
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/CopyAndCloseTest.kt
@@ -0,0 +1,74 @@
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.*
+import org.junit.*
+import java.io.*
+import kotlin.test.*
+
+class CopyAndCloseTest : TestBase() {
+    private val from = ByteChannel(true)
+    private val to = ByteChannel(true)
+
+    @Test
+    fun smokeTest() = runBlocking {
+        expect(1)
+
+        launch(coroutineContext) {
+            expect(2)
+            val copied = from.copyAndClose(to) // should suspend
+
+            expect(7)
+
+            assertEquals(8, copied)
+        }
+
+        yield()
+
+        expect(3)
+        from.writeInt(1)
+        expect(4)
+        from.writeInt(2)
+        expect(5)
+
+        yield()
+        expect(6)
+
+        from.close()
+        yield()
+
+        finish(8)
+    }
+
+    @Test
+    fun failurePropagation() = runBlocking {
+        expect(1)
+
+        launch(coroutineContext) {
+            expect(2)
+
+            try {
+                from.copyAndClose(to) // should suspend and then throw IOException
+                fail("Should rethrow exception")
+            } catch (expected: IOException) {
+            }
+
+            expect(4)
+        }
+
+        yield()
+        expect(3)
+
+        from.close(IOException())
+        yield()
+
+        expect(5)
+
+        try {
+            to.readInt()
+            fail("Should throw exception")
+        } catch (expected: IOException) {
+        }
+
+        finish(6)
+    }
+}
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/PooledBufferTest.kt b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/PooledBufferTest.kt
new file mode 100644
index 0000000..d8d15a0
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/PooledBufferTest.kt
@@ -0,0 +1,96 @@
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.io.internal.ObjectPool
+import kotlinx.coroutines.experimental.io.internal.ReadWriteBufferState
+import kotlinx.coroutines.experimental.runBlocking
+import org.junit.After
+import org.junit.Test
+import java.io.*
+import java.nio.ByteBuffer
+import java.util.concurrent.CopyOnWriteArrayList
+import kotlin.test.assertEquals
+import kotlin.test.assertTrue
+import kotlin.test.fail
+
+class PooledBufferTest {
+    private val allocated = CopyOnWriteArrayList<ByteBuffer>()
+
+    private inner class TestPool : ObjectPool<ReadWriteBufferState.Initial> {
+        override val capacity: Int get() = 0
+
+        override fun borrow(): ReadWriteBufferState.Initial {
+            val buffer = ReadWriteBufferState.Initial(ByteBuffer.allocate(4096))
+            allocated.add(buffer.backingBuffer)
+            return buffer
+        }
+
+        override fun recycle(instance: ReadWriteBufferState.Initial) {
+            if (!allocated.remove(instance.backingBuffer)) {
+                fail("Couldn't release buffer from pool")
+            }
+        }
+
+        override fun dispose() {
+        }
+    }
+
+    private val channel = ByteBufferChannel(autoFlush = true, pool = TestPool())
+
+    @After
+    fun tearDown() {
+        assertTrue { allocated.isEmpty() }
+    }
+
+    @Test
+    fun testWriteReadClose() {
+        runBlocking {
+            channel.writeInt(1)
+            assertEquals(1, allocated.size)
+            channel.readInt()
+            channel.close()
+            assertEquals(0, allocated.size)
+        }
+    }
+
+    @Test
+    fun testWriteCloseRead() {
+        runBlocking {
+            channel.writeInt(1)
+            assertEquals(1, allocated.size)
+            channel.close()
+            channel.readInt()
+            assertEquals(0, allocated.size)
+        }
+    }
+
+    @Test
+    fun testWriteCloseReadRead() {
+        runBlocking {
+            channel.writeInt(1)
+            assertEquals(1, allocated.size)
+            channel.close()
+            channel.readShort()
+            assertEquals(1, allocated.size)
+            channel.readShort()
+            assertEquals(0, allocated.size)
+        }
+    }
+
+    @Test
+    fun testCloseOnly() {
+        runBlocking {
+            channel.close()
+            assertEquals(0, allocated.size)
+        }
+    }
+
+    @Test
+    fun testCloseWithEerror() {
+        runBlocking {
+            channel.writeFully("OK".toByteArray())
+            assertEquals(1, allocated.size)
+            channel.close(IOException())
+            assertEquals(0, allocated.size)
+        }
+    }
+}
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/StringScenarioTest.kt b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/StringScenarioTest.kt
new file mode 100644
index 0000000..0056874
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/StringScenarioTest.kt
@@ -0,0 +1,268 @@
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.*
+import org.junit.*
+import org.junit.rules.*
+import java.util.concurrent.*
+import kotlin.test.*
+
+class StringScenarioTest : TestBase() {
+    @get:Rule
+    val timeout = Timeout(10L, TimeUnit.SECONDS)
+
+    private val ch = ByteBufferChannel(autoFlush = true)
+
+    @Test
+    fun testWriteCharByChar() {
+        runBlocking {
+            expect(1)
+
+            launch(coroutineContext) {
+                ch.writeStringUtf8("A")
+                expect(3)
+                yield()
+
+                expect(5)
+                ch.writeStringUtf8("B")
+                expect(6)
+                yield()
+
+                expect(7)
+                ch.writeStringUtf8("\n")
+                expect(8)
+                yield()
+            }
+
+            expect(2)
+            yield()
+
+            expect(4)
+            val line = ch.readUTF8Line()
+
+            assertEquals("AB", line)
+            expect(9)
+
+            yield()
+            expect(10)
+            finish(11)
+        }
+    }
+
+    @Test
+    fun testSplitUtf8() {
+        runBlocking {
+            val sb = StringBuilder()
+
+            expect(1)
+
+            launch(coroutineContext) {
+                expect(2)
+                val b = byteArrayOf(0xd0.toByte(), 0x9a.toByte(), 0x0a)
+                ch.writeFully(b, 0, 1)
+                yield()
+
+                expect(3)
+                assertTrue { sb.isEmpty() }
+
+                ch.writeFully(b, 1, 1)
+                yield()
+
+                expect(4)
+                assertEquals("\u041a", sb.toString())
+
+                ch.writeFully(b, 2, 1)
+                yield()
+            }
+
+            ch.readUTF8LineTo(sb)
+            expect(5)
+
+            assertEquals("\u041a", sb.toString())
+
+            finish(6)
+        }
+    }
+
+    @Test
+    fun testSplitLineDelimiter() = runBlocking {
+        expect(1)
+
+        launch(coroutineContext) {
+            expect(2)
+            ch.writeFully("ABC\r".toByteArray())
+            expect(3)
+            yield()
+
+            expect(5)
+            ch.writeFully("\n".toByteArray())
+            yield()
+        }
+
+        yield()
+
+        expect(4)
+        val line = ch.readASCIILine()
+        expect(6)
+
+        assertEquals("ABC", line)
+
+        finish(7)
+    }
+
+    @Test
+    fun testReadTailWriteFirst() = runBlocking {
+        expect(1)
+
+        launch(coroutineContext) {
+            expect(2)
+
+            ch.writeFully("ABC".toByteArray())
+
+            yield()
+
+            expect(4)
+            ch.close()
+            yield()
+        }
+
+        yield()
+
+        expect(3)
+
+        val line = ch.readUTF8Line()
+        expect(5)
+        assertEquals("ABC", line)
+
+        finish(6)
+    }
+
+    @Test
+    fun testReadTailReadFirst() = runBlocking {
+        expect(1)
+
+        launch(coroutineContext) {
+            expect(3)
+
+            ch.writeFully("ABC".toByteArray())
+
+            yield()
+
+            expect(4)
+            ch.close()
+            yield()
+        }
+
+        expect(2)
+
+        val line = ch.readUTF8Line()
+        expect(5)
+        assertEquals("ABC", line)
+
+        finish(6)
+    }
+
+    @Test
+    fun testReadThroughWrap() = runBlocking {
+        val L = ".".repeat(128)
+
+        expect(1)
+
+        launch(coroutineContext) {
+            expect(2)
+
+            ch.writeFully(ByteArray(4000))
+
+            expect(3)
+            ch.readFully(ByteArray(3999)) // keep one byte remaining to keep buffer unreleased
+
+            expect(4)
+
+            ch.writeFully(L.toByteArray())
+
+            expect(5)
+            ch.close()
+        }
+
+        yield()
+
+        expect(6)
+
+        ch.readByte()
+        expect(7)
+
+        val line = ch.readUTF8Line()
+
+        finish(8)
+
+        assertEquals(L, line)
+    }
+
+    @Test
+    fun testReadShifted() = runBlocking {
+        val L = ".".repeat(127) + "\n"
+        var base = 0
+
+        for (shift in 1..4096 - 8) {
+            expect(base + 1)
+
+            launch(coroutineContext) {
+                expect(base + 2)
+
+                ch.writeFully(ByteArray(shift))
+
+                expect(base + 3)
+                ch.readFully(ByteArray(shift - 1)) // keep one byte remaining to keep buffer unreleased
+
+                expect(base + 4)
+
+                ch.writeFully(L.toByteArray())
+
+                expect(base + 5)
+            }
+
+            yield()
+
+            expect(base + 6)
+
+            ch.readByte()
+            expect(base + 7)
+
+            val line = ch.readUTF8Line()
+
+            expect(base + 8)
+
+            assertEquals(L.dropLast(1), line)
+
+            base += 8
+        }
+
+        finish(base + 1)
+    }
+
+    @Test
+    fun writeLongLine() = runBlocking {
+        val L = ".".repeat(16384)
+
+        expect(1)
+
+        launch(coroutineContext) {
+            expect(2)
+
+            ch.writeFully(L.toByteArray())
+
+            expect(4)
+            ch.close()
+        }
+
+        yield()
+
+        expect(3)
+        val line = ch.readUTF8Line()
+
+        expect(5)
+
+        assertEquals(L, line)
+
+        finish(6)
+    }
+}
\ No newline at end of file
diff --git a/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/StringsTest.kt b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/StringsTest.kt
new file mode 100644
index 0000000..49a3656
--- /dev/null
+++ b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/StringsTest.kt
@@ -0,0 +1,301 @@
+package kotlinx.coroutines.experimental.io
+
+import kotlinx.coroutines.experimental.*
+import org.junit.*
+import org.junit.rules.*
+import java.util.*
+import java.util.concurrent.*
+import kotlin.test.*
+
+class StringsTest {
+    @get:Rule
+    val timeout = Timeout(10, TimeUnit.SECONDS)
+
+    private val channel = ByteBufferChannel(autoFlush = true)
+
+    @Test
+    fun testReadString() {
+        runBlocking {
+            writeString("Hello, World!")
+            channel.close()
+            assertEquals("Hello, World!", channel.readASCIILine())
+        }
+    }
+
+    @Test
+    fun testReadLines1() {
+        testReadLine("\r", "", "")
+    }
+
+    @Test
+    fun testReadLinesCases() {
+        testReadLine("abc", "abc", "")
+        testReadLine("", null, "")
+
+        testReadLine("\n", "", "")
+        testReadLine("\r", "", "")
+        testReadLine("\r\n", "", "")
+        testReadLine("1\n", "1", "")
+        testReadLine("1\r", "1", "")
+        testReadLine("1\r\n", "1", "")
+
+        testReadLine("\n2", "", "2")
+        testReadLine("\r2", "", "2")
+        testReadLine("\r\n2", "", "2")
+        testReadLine("1\n2", "1", "2")
+        testReadLine("1\r2", "1", "2")
+        testReadLine("1\r\n2", "1", "2")
+
+        // unicode
+        testReadLine("\u0440\n", "\u0440", "")
+        testReadLine("\u0440\n1", "\u0440", "1")
+        testReadLine("\u0440\r", "\u0440", "")
+        testReadLine("\u0440\r2", "\u0440", "2")
+    }
+
+    private fun testReadLine(source: String, expectedLine: String?, expectedRemaining: String) {
+        val content = source.toByteArray(Charsets.UTF_8)
+
+        // no splitting
+        runBlocking {
+            val ch = ByteReadChannel(content)
+//            testReadLine(ch, expectedLine, expectedRemaining)
+        }
+
+        // split
+        for (splitAt in 0 until content.size) {
+            val ch = ByteChannel(true)
+            runBlocking {
+                launch(coroutineContext) {
+                    ch.writeFully(content, 0, splitAt)
+                    yield()
+                    ch.writeFully(content, splitAt, content.size - splitAt)
+                    ch.close()
+                }
+
+                testReadLine(ch, expectedLine, expectedRemaining)
+            }
+        }
+    }
+
+    private suspend fun testReadLine(ch: ByteReadChannel, expectedLine: String?, expectedRemaining: String) {
+        val line = ch.readUTF8Line()
+        assertEquals(expectedLine, line)
+
+        val buffer = ByteBuffer.allocate(8192)
+        val rc = ch.readAvailable(buffer)
+
+        if (expectedRemaining.isNotEmpty()) {
+            assertNotEquals(-1, rc, "Unexpected EOF. Expected >= 0")
+        }
+
+        buffer.flip()
+        assertEquals(expectedRemaining, Charsets.UTF_8.decode(buffer).toString())
+    }
+
+    @Test
+    fun testReadLines() {
+        runBlocking {
+            writeString("Hello, World!\nLine2")
+            assertEquals("Hello, World!", channel.readASCIILine())
+            channel.close()
+            assertEquals("Line2", channel.readASCIILine())
+        }
+    }
+
+    @Test
+    fun testReadASCIILineLf() {
+        runBlocking {
+            writeParts("A", "B\n", "C")
+
+            assertEquals("AB", channel.readASCIILine())
+            assertEquals("C", channel.readASCIILine())
+            assertEquals(null, channel.readASCIILine())
+        }
+    }
+
+    @Test
+    fun testReadASCIILineCrLf() {
+        runBlocking {
+            writeParts("A", "B\r\n", "C")
+
+            assertEquals("AB", channel.readASCIILine())
+            assertEquals("C", channel.readASCIILine())
+            assertEquals(null, channel.readASCIILine())
+        }
+    }
+
+    @Test
+    fun testReadASCIILineCrLfBadSplit() {
+        runBlocking {
+            writeParts("A", "B\r", "\nC")
+
+            assertEquals("AB", channel.readASCIILine())
+            assertEquals("C", channel.readASCIILine())
+            assertEquals(null, channel.readASCIILine())
+        }
+    }
+
+    @Test
+    fun testReadASCIILineTrailingLf() {
+        runBlocking {
+            writeParts("A", "B\n", "C\n")
+
+            assertEquals("AB", channel.readASCIILine())
+            assertEquals("C", channel.readASCIILine())
+            assertEquals(null, channel.readASCIILine())
+        }
+    }
+
+    @Test
+    fun testReadASCIILineLeadingLf() {
+        runBlocking {
+            writeParts("\nA", "B\n", "C")
+
+            assertEquals("", channel.readASCIILine())
+            assertEquals("AB", channel.readASCIILine())
+            assertEquals("C", channel.readASCIILine())
+            assertEquals(null, channel.readASCIILine())
+        }
+    }
+
+    @Test
+    fun testLookAhead() {
+        val text = buildString() {
+            for (i in 0 until 65535) {
+                append((i and 0xf).toString(16))
+            }
+        }.toByteArray()
+
+        runBlocking {
+            launch(CommonPool) {
+                channel.writeFully(text)
+                channel.close()
+            }
+
+            val comparison = ByteBuffer.wrap(text)
+
+            val arr = ByteArray(128)
+            var rem = text.size
+            val rnd = Random()
+
+            while (rem > 0) {
+                val s = rnd.nextInt(arr.size).coerceIn(1, rem)
+                arr.fill(0)
+                val rc = channel.readAvailable(arr, 0, s)
+
+                if (rc == -1) fail("EOF")
+
+                val actual = String(arr, 0, rc)
+
+                val expectedBytes = ByteArray(rc)
+                comparison.get(expectedBytes)
+                val expected = expectedBytes.toString(Charsets.ISO_8859_1)
+
+                assertEquals(expected, actual)
+
+                rem -= rc
+            }
+        }
+    }
+
+    @Test
+    fun testLongLinesConcurrent() {
+        val lines = (0..1024).map { size ->
+            buildString(size) {
+                for (i in 0 until size) {
+                    append((i and 0xf).toString(16))
+                }
+            }
+        }
+
+        runBlocking {
+            launch(CommonPool) {
+                for (part in lines) {
+                    writeString(part + "\n")
+                }
+                channel.close()
+            }
+
+            for (expected in lines) {
+                assertEquals(expected, channel.readASCIILine(expected.length))
+            }
+
+            assertNull(channel.readASCIILine())
+        }
+    }
+
+    @Test
+    fun testLongLinesSequential() {
+        val lines = (0..1024).map { size ->
+            buildString(size) {
+                for (i in 0 until size) {
+                    append((i and 0xf).toString(16))
+                }
+            }
+        }
+
+        runBlocking {
+            launch(coroutineContext) {
+                for (part in lines) {
+                    writeString(part + "\n")
+                    yield()
+                }
+                channel.close()
+            }
+
+            for (expected in lines) {
+                Thread.yield()
+                assertEquals(expected, channel.readASCIILine(expected.length))
+            }
+
+            assertNull(channel.readASCIILine())
+        }
+    }
+
+    @Test
+    fun testReadUTF8Line2bytes() {
+        val parts = byteArrayOf(0xd0.toByte(), 0x9a.toByte(), 0x0a)
+
+        runBlocking {
+            channel.writeFully(parts)
+            assertEquals("\u041a", channel.readUTF8Line())
+        }
+    }
+
+    @Test
+    fun testReadUTF8Line3bytes() {
+        val parts = byteArrayOf(0xe0.toByte(), 0xaf.toByte(), 0xb5.toByte(), 0x0a)
+
+        runBlocking {
+            channel.writeFully(parts)
+            assertEquals("\u0BF5", channel.readUTF8Line())
+        }
+    }
+
+    @Test
+    fun testReadUTF8Line4bytes() {
+        val parts = byteArrayOf(0xF0.toByte(), 0xA6.toByte(), 0x88.toByte(), 0x98.toByte(), 0x0a)
+
+        runBlocking {
+            channel.writeFully(parts)
+            assertEquals("\uD858\uDE18", channel.readUTF8Line())
+        }
+    }
+
+    private suspend fun writeString(s: String) {
+        channel.writeFully(s.toByteArray(Charsets.ISO_8859_1))
+    }
+
+    private fun writeParts(vararg parts: String) {
+        launch(CommonPool) {
+            parts.forEach { p ->
+                writeString(p)
+                yield()
+                delay(1)
+            }
+
+            channel.close()
+        }
+    }
+}
\ No newline at end of file