IO: introduce ByteChannel.joinTo
diff --git a/core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt b/core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt
index b2d0ce9..f82aec8 100644
--- a/core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt
+++ b/core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt
@@ -38,6 +38,10 @@
     private var closed: ClosedElement? = null
 
     @Volatile
+    private var delegatedTo: ByteBufferChannel? = null
+    private var delegateClose: Boolean = false
+
+    @Volatile
     private var readOp: CancellableContinuation<Boolean>? = null
 
     @Volatile
@@ -77,10 +81,17 @@
         state.capacity.flush()
         if (state.capacity.isEmpty() || cause != null) tryTerminate()
         resumeClosed(cause)
+
+        if (state === ReadWriteBufferState.Terminated) {
+            delegatedTo?.close(cause)
+        }
+
         return true
     }
 
     override fun flush() {
+        delegatedTo?.let { delegated -> delegated.flush(); return }
+
         if (!state.capacity.flush()) return
         resumeReadOp()
         if (availableForWrite > 0) resumeWriteOp()
@@ -98,20 +109,36 @@
         position(position)
     }
 
-    private fun setupStateForWrite(): ByteBuffer {
+    private fun setupStateForWrite(): ByteBuffer? {
         var _allocated: ReadWriteBufferState.Initial? = null
         val (old, newState) = updateState { state ->
             when (state) {
                 ReadWriteBufferState.IdleEmpty -> {
-                    val allocated = _allocated ?: newBuffer().also { _allocated = it }
-                    allocated.startWriting()
+                    if (delegatedTo != null) {
+                        ReadWriteBufferState.Terminated
+                    } else {
+                        val allocated = _allocated ?: newBuffer().also { _allocated = it }
+                        allocated.startWriting()
+                    }
                 }
-                ReadWriteBufferState.Terminated -> throw closed!!.sendException
+                ReadWriteBufferState.Terminated -> {
+                    if (delegatedTo != null) {
+                        return null
+                    }
+
+                    throw closed!!.sendException
+                }
                 else -> {
                     state.startWriting()
                 }
             }
         }
+
+        if (delegatedTo != null && state === ReadWriteBufferState.Terminated) {
+            ReadOp.getAndSet(this, null)?.resume(false)
+            return null
+        }
+
         val buffer = newState.writeBuffer
 
         _allocated?.let { allocated ->
@@ -145,7 +172,7 @@
     private fun setupStateForRead(): ByteBuffer? {
         val (_, newState) = updateState { state ->
             when (state) {
-                ReadWriteBufferState.Terminated -> closed!!.cause?.let { throw it } ?: return null
+                ReadWriteBufferState.Terminated -> closed?.cause?.let { throw it } ?: return null
                 ReadWriteBufferState.IdleEmpty -> closed?.cause?.let { throw it } ?: return null
                 else -> {
                     if (state.capacity.availableForRead == 0) return null
@@ -189,18 +216,40 @@
         }
     }
 
-    private fun tryTerminate() {
-        val closed = closed ?: return
+    private fun setupDelegateTo(delegate: ByteBufferChannel, delegateClose: Boolean) {
+        require(this !== delegate)
+
+        this.delegateClose = delegateClose
+        this.delegatedTo = delegate
+        resumeWriteOp()
+
+        val alreadyClosed = closed
+        if (alreadyClosed != null) {
+            if (alreadyClosed.cause != null) delegate.close(alreadyClosed.cause)
+            else if (delegateClose) delegate.close()
+            else delegate.flush()
+        } else {
+            flush()
+        }
+    }
+
+    private fun tryTerminate(): Boolean {
+        val closed = closed
+        val delegate = delegatedTo
+
+        if (closed == null && delegate == null) return false
+
         var toRelease: ReadWriteBufferState.Initial? = null
 
         updateState { state ->
             when {
+                state === ReadWriteBufferState.Terminated -> return true
                 state === ReadWriteBufferState.IdleEmpty -> ReadWriteBufferState.Terminated
-                closed.cause != null && state is ReadWriteBufferState.IdleNonEmpty -> {
+                closed?.cause != null && state is ReadWriteBufferState.IdleNonEmpty -> {
                     toRelease = state.initial
                     ReadWriteBufferState.Terminated
                 }
-                else -> return
+                else -> return false
             }
         }
 
@@ -210,24 +259,46 @@
             }
         }
 
-        WriteOp.getAndSet(this, null)?.resumeWithException(closed.sendException)
-        ReadOp.getAndSet(this, null)?.apply {
-            if (closed.cause != null) resumeWithException(closed.cause) else resume(false)
+        if (closed != null) {
+            WriteOp.getAndSet(this, null)?.resumeWithException(closed.sendException)
+            ReadOp.getAndSet(this, null)?.apply {
+                if (closed.cause != null) resumeWithException(closed.cause) else resume(false)
+            }
+        } else if (delegate != null) {
+            WriteOp.getAndSet(this, null)?.resumeWithException(IllegalStateException("Delegation preparation is in progress"))
+            ReadOp.getAndSet(this, null)?.resume(false) // send EOF to delegator copy loop
         }
+
+        return true
     }
 
     private fun ByteBuffer.carryIndex(idx: Int) = if (idx >= capacity() - reservedSize) idx - (capacity() - reservedSize) else idx
 
-    private inline fun writing(block: ByteBuffer.(RingBufferCapacity) -> Unit) {
-        val buffer = setupStateForWrite()
-        val capacity = state.capacity
+    private inline fun writing(block: ByteBufferChannel.(ByteBuffer, RingBufferCapacity) -> Unit) {
+        val buffer: ByteBuffer
+        val capacity: RingBufferCapacity
+
+        var current = this
+        while (true) {
+            val bb = current.setupStateForWrite()
+            if (bb == null && current.state === ReadWriteBufferState.Terminated) {
+                current = current.delegatedTo!!
+            } else if (bb == null) {
+                return
+            } else {
+                buffer = bb
+                capacity = current.state.capacity
+                break
+            }
+        }
+
         try {
-            closed?.let { throw it.sendException }
-            block(buffer, capacity)
+            current.closed?.let { throw it.sendException }
+            block(current, buffer, capacity)
         } finally {
-            if (capacity.isFull() || autoFlush) flush()
-            restoreStateAfterWrite()
-            tryTerminate()
+            if (capacity.isFull() || current.autoFlush) current.flush()
+            current.restoreStateAfterWrite()
+            current.tryTerminate()
         }
     }
 
@@ -677,8 +748,20 @@
         resumeWriteOp()
     }
 
+    private suspend fun <R> delegation(delegate: ByteBufferChannel, block: suspend (ByteBufferChannel) -> R): R {
+        restoreStateAfterWrite()
+
+        while (true) {
+            if (tryTerminate()) {
+                return block(delegate)
+            } else {
+                writeSuspend(Int.MAX_VALUE) // wait for delegator loop complete
+            }
+        }
+    }
+
     suspend override fun writeByte(b: Byte) {
-        val buffer = setupStateForWrite()
+        val buffer = setupStateForWrite() ?: return delegatedTo!!.writeByte(b)
         val c = state.capacity
 
         return tryWriteByte(buffer, b, c)
@@ -708,12 +791,16 @@
             throw t
         }
 
+        delegatedTo?.let { delegate ->
+            return delegation(delegate) { it.writeByte(b) }
+        }
+
         buffer.prepareBuffer(writeByteOrder, writePosition, c.availableForWrite)
-        tryWriteByte(buffer, b, c)
+        return tryWriteByte(buffer, b, c)
     }
 
     suspend override fun writeShort(s: Short) {
-        val buffer = setupStateForWrite()
+        val buffer = setupStateForWrite() ?: return delegatedTo!!.writeShort(s)
         val c = state.capacity
 
         return tryWriteShort(buffer, s, c)
@@ -753,8 +840,12 @@
             throw t
         }
 
+        delegatedTo?.let { delegate ->
+            return delegation(delegate) { it.writeShort(s) }
+        }
+
         buffer.prepareBuffer(writeByteOrder, writePosition, c.availableForWrite)
-        tryWriteShort(buffer, s, c)
+        return tryWriteShort(buffer, s, c)
     }
 
     private fun ByteBuffer.tryWriteInt(i: Int, c: RingBufferCapacity): Boolean {
@@ -778,7 +869,7 @@
     }
 
     suspend override fun writeInt(i: Int) {
-        val buffer = setupStateForWrite()
+        val buffer = setupStateForWrite() ?: return delegatedTo!!.writeInt(i)
         val c = state.capacity
 
         if (!buffer.tryWriteInt(i, c)) {
@@ -795,9 +886,13 @@
             throw t
         }
 
+        delegatedTo?.let { delegate ->
+            return delegation(delegate) { it.writeInt(i) }
+        }
+
         prepareBuffer(writeByteOrder, writePosition, c.availableForWrite)
         if (!tryWriteInt(i, c)) {
-            writeIntSuspend(i, c)
+            return writeIntSuspend(i, c)
         }
     }
 
@@ -822,7 +917,7 @@
     }
 
     suspend override fun writeLong(l: Long) {
-        val buffer = setupStateForWrite()
+        val buffer = setupStateForWrite() ?: return delegatedTo!!.writeLong(l)
         val c = state.capacity
 
         if (!buffer.tryWriteLong(l, c)) {
@@ -831,11 +926,21 @@
     }
 
     private tailrec suspend fun ByteBuffer.writeLongSuspend(l: Long, c: RingBufferCapacity) {
-        writeSuspend(8)
+        try {
+            writeSuspend(8)
+        } catch (t: Throwable) {
+            restoreStateAfterWrite()
+            tryTerminate()
+            throw t
+        }
+
+        delegatedTo?.let { delegate ->
+            return delegation(delegate) { it.writeLong(l) }
+        }
 
         prepareBuffer(writeByteOrder, writePosition, c.availableForWrite)
         if (!tryWriteLong(l, c)) {
-            writeLongSuspend(l, c)
+            return writeLongSuspend(l, c)
         }
     }
 
@@ -863,7 +968,12 @@
 
     private suspend fun writeAvailableSuspend(src: ByteBuffer): Int {
         while (true) {
-            writeSuspend(1)
+            writeSuspend(1) // here we don't need to restoreStateAfterWrite as write copy loop doesn't hold state
+
+            delegatedTo?.let { delegate ->
+                return delegation(delegate) { it.writeAvailable(src) }
+            }
+
             val copied = writeAvailable(src)
             if (copied > 0) return copied
         }
@@ -872,6 +982,11 @@
     private suspend fun writeAvailableSuspend(src: BufferView): Int {
         while (true) {
             writeSuspend(1)
+
+            delegatedTo?.let { delegate ->
+                return delegation(delegate) { it.writeAvailable(src) }
+            }
+
             val copied = writeAvailable(src)
             if (copied > 0) return copied
         }
@@ -894,6 +1009,11 @@
     private suspend fun writeFullySuspend(src: ByteBuffer) {
         while (src.hasRemaining()) {
             writeSuspend(1)
+
+            delegatedTo?.let { delegate ->
+                return delegation(delegate) { it.writeFully(src) }
+            }
+
             writeAsMuchAsPossible(src)
         }
     }
@@ -901,28 +1021,43 @@
     private suspend fun writeFullySuspend(src: BufferView) {
         while (src.canRead()) {
             writeSuspend(1)
+
+            delegatedTo?.let { delegate ->
+                return delegation(delegate) { it.writeFully(src) }
+            }
+
             writeAsMuchAsPossible(src)
         }
     }
 
-    internal suspend fun copyDirect(src: ByteBufferChannel, limit: Long = Long.MAX_VALUE): Long {
+    internal suspend fun joinFrom(src: ByteBufferChannel, delegateClose: Boolean) {
+        src.setupDelegateTo(this, delegateClose)
+        copyDirect(src, Long.MAX_VALUE, leaveOnDelegation = true)
+
+        if (delegateClose && src.isClosedForRead) {
+            close()
+        } else {
+            flush()
+        }
+    }
+
+    internal suspend fun copyDirect(src: ByteBufferChannel, limit: Long, leaveOnDelegation: Boolean): Long {
         if (limit == 0L || src.isClosedForRead) return 0L
         val autoFlush = autoFlush
         val byteOrder = writeByteOrder
 
         try {
             var copied = 0L
-            writing { state ->
-                val dstBuffer = this
-
+            writing { dstBuffer, state ->
                 while (copied < limit) {
                     var avWBefore = state.availableForWrite
                     if (avWBefore == 0) {
                         writeSuspend(1)
+                        if (delegatedTo != null) break
                         avWBefore = state.availableForWrite
                     }
 
-                    prepareBuffer(byteOrder, writePosition, avWBefore)
+                    dstBuffer.prepareBuffer(byteOrder, writePosition, avWBefore)
 
                     var partSize = 0
 
@@ -948,7 +1083,7 @@
                     }
 
                     if (rc) {
-                        bytesWritten(state, partSize)
+                        dstBuffer.bytesWritten(state, partSize)
                         copied += partSize
 
                         if (avWBefore - partSize == 0 || autoFlush) {
@@ -958,6 +1093,15 @@
                         if (src.isClosedForRead) break
 
                         flush()
+
+                        if (leaveOnDelegation) {
+                            if (src.tryShutdownForDelegation()) break
+                            else if (src.state.capacity.flush()) {
+                                src.resumeWriteOp()
+                                continue
+                            }
+                        }
+
                         if (!src.readSuspend(1)) break
                     }
                 }
@@ -967,6 +1111,10 @@
                 flush()
             }
 
+            delegatedTo?.let { delegate ->
+                return copied + delegate.copyDirect(src, limit - copied, leaveOnDelegation)
+            }
+
             return copied
         } catch (t: Throwable) {
             close(t)
@@ -974,29 +1122,41 @@
         }
     }
 
+    private fun tryShutdownForDelegation(): Boolean {
+        if (delegatedTo == null) return false
+
+        updateState { before ->
+            if (before === ReadWriteBufferState.Terminated) return true
+            if (before === ReadWriteBufferState.IdleEmpty) ReadWriteBufferState.Terminated
+            else return false
+        }
+
+        return true
+    }
+
     private fun writeAsMuchAsPossible(src: ByteBuffer): Int {
-        writing {
+        writing { dst, state ->
             var written = 0
             val srcLimit = src.limit()
 
             do {
                 val srcRemaining = srcLimit - src.position()
                 if (srcRemaining == 0) break
-                val possibleSize = it.tryWriteAtMost(minOf(srcRemaining, remaining()))
+                val possibleSize = state.tryWriteAtMost(minOf(srcRemaining, dst.remaining()))
                 if (possibleSize == 0) break
                 require(possibleSize > 0)
 
                 src.limit(src.position() + possibleSize)
-                put(src)
+                dst.put(src)
 
                 written += possibleSize
 
-                prepareBuffer(writeByteOrder, carryIndex(writePosition + written), it.availableForWrite)
+                dst.prepareBuffer(writeByteOrder, dst.carryIndex(writePosition + written), state.availableForWrite)
             } while (true)
 
             src.limit(srcLimit)
 
-            bytesWritten(it, written)
+            dst.bytesWritten(state, written)
 
             return written
         }
@@ -1005,22 +1165,22 @@
     }
 
     private fun writeAsMuchAsPossible(src: BufferView): Int {
-        writing {
+        writing { dst, state ->
             var written = 0
 
             do {
                 val srcSize = src.readRemaining
-                val possibleSize = it.tryWriteAtMost(minOf(srcSize, remaining()))
+                val possibleSize = state.tryWriteAtMost(minOf(srcSize, dst.remaining()))
                 if (possibleSize == 0) break
 
-                src.read(this, possibleSize)
+                src.read(dst, possibleSize)
 
                 written += possibleSize
 
-                prepareBuffer(writeByteOrder, carryIndex(writePosition + written), it.availableForWrite)
+                dst.prepareBuffer(writeByteOrder, dst.carryIndex(writePosition + written), state.availableForWrite)
             } while (true)
 
-            bytesWritten(it, written)
+            dst.bytesWritten(state, written)
 
             return written
         }
@@ -1029,21 +1189,21 @@
     }
 
     private fun writeAsMuchAsPossible(src: ByteArray, offset: Int, length: Int): Int {
-        writing {
+        writing { dst, state ->
             var written = 0
 
             do {
-                val possibleSize = it.tryWriteAtMost(minOf(length - written, remaining()))
+                val possibleSize = state.tryWriteAtMost(minOf(length - written, dst.remaining()))
                 if (possibleSize == 0) break
                 require(possibleSize > 0)
 
-                put(src, offset + written, possibleSize)
+                dst.put(src, offset + written, possibleSize)
                 written += possibleSize
 
-                prepareBuffer(writeByteOrder, carryIndex(writePosition + written), it.availableForWrite)
+                dst.prepareBuffer(writeByteOrder, dst.carryIndex(writePosition + written), state.availableForWrite)
             } while (true)
 
-            bytesWritten(it, written)
+            dst.bytesWritten(state, written)
 
             return written
         }
@@ -1083,6 +1243,11 @@
     private suspend fun writeSuspend(src: ByteArray, offset: Int, length: Int): Int {
         while (true) {
             writeSuspend(1)
+
+            delegatedTo?.let { delegate ->
+                return delegation(delegate) { it.writeAvailable(src, offset, length) }
+            }
+
             val size = writeAsMuchAsPossible(src, offset, length)
             if (size > 0) return size
         }
@@ -1093,17 +1258,17 @@
 
         var written = false
 
-        writing {
-            if (it.availableForWrite >= min) {
-                val position = this.position()
-                val l = this.limit()
-                block(this)
-                if (l != this.limit()) throw IllegalStateException("buffer limit modified")
-                val delta = position() - position
+        writing { dst, state ->
+            if (state.availableForWrite >= min) {
+                val position = dst.position()
+                val l = dst.limit()
+                block(dst)
+                if (l != dst.limit()) throw IllegalStateException("buffer limit modified")
+                val delta = dst.position() - position
                 if (delta < 0) throw IllegalStateException("position has been moved backward: pushback is not supported")
 
-                if (!it.tryWriteExact(delta)) throw IllegalStateException()
-                bytesWritten(it, delta)
+                if (!state.tryWriteExact(delta)) throw IllegalStateException()
+                dst.bytesWritten(state, delta)
                 written = true
             }
         }
@@ -1115,6 +1280,9 @@
 
     private suspend fun writeBlockSuspend(min: Int, block: (ByteBuffer) -> Unit) {
         writeSuspend(min)
+        delegatedTo?.let { delegate ->
+            return delegation(delegate) { it.write(min, block) }
+        }
         write(min, block)
     }
 
@@ -1166,6 +1334,9 @@
         try {
             while (!packet.isEmpty) {
                 writeSuspend(1)
+                delegatedTo?.let { delegate ->
+                    return delegation(delegate) { it.writePacket(packet) }
+                }
                 tryWritePacketPart(packet)
             }
         } finally {
@@ -1175,12 +1346,12 @@
 
     private fun tryWritePacketPart(packet: ByteReadPacket): Int {
         var copied = 0
-        writing {
-            val size = it.tryWriteAtMost(minOf(packet.remaining, remaining()))
+        writing { dst, state ->
+            val size = state.tryWriteAtMost(minOf(packet.remaining, dst.remaining()))
             if (size > 0) {
-                limit(position() + size)
-                packet.readFully(this)
-                bytesWritten(it, size)
+                dst.limit(dst.position() + size)
+                packet.readFully(dst)
+                dst.bytesWritten(state, size)
             }
             copied = size
         }
@@ -1548,15 +1719,15 @@
     }
 
     private suspend fun writeSuspend(size: Int) {
-        while (state.capacity.availableForWrite < size && state !== ReadWriteBufferState.IdleEmpty) {
+        while (state.capacity.availableForWrite < size && state !== ReadWriteBufferState.IdleEmpty && delegatedTo == null) {
             suspendCancellableCoroutine<Unit>(holdCancellability = true) { c ->
                 do {
                     closed?.sendException?.let { throw it }
-                    if (state.capacity.availableForWrite >= size || state === ReadWriteBufferState.IdleEmpty) {
+                    if (state.capacity.availableForWrite >= size || state === ReadWriteBufferState.IdleEmpty || delegatedTo != null) {
                         c.resume(Unit)
                         break
                     }
-                } while (!setContinuation({ writeOp }, WriteOp, c, { closed == null && state.capacity.availableForWrite < size && state !== ReadWriteBufferState.IdleEmpty }))
+                } while (!setContinuation({ writeOp }, WriteOp, c, { closed == null && state.capacity.availableForWrite < size && state !== ReadWriteBufferState.IdleEmpty && delegatedTo == null }))
 
                 flush()
             }
diff --git a/core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteReadChannel.kt b/core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteReadChannel.kt
index 216ecdd..79c6b5e 100644
--- a/core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteReadChannel.kt
+++ b/core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteReadChannel.kt
@@ -137,6 +137,26 @@
     suspend fun read(min: Int = 1, block: (ByteBuffer) -> Unit)
 }
 
+suspend fun ByteReadChannel.joinTo(dst: ByteWriteChannel, closeOnEnd: Boolean) {
+    require(dst !== this)
+
+    if (this is ByteBufferChannel && dst is ByteBufferChannel) {
+        return dst.joinFrom(this, closeOnEnd)
+    }
+
+    return joinToImpl(dst, closeOnEnd)
+}
+
+private suspend fun ByteReadChannel.joinToImpl(dst: ByteWriteChannel, close: Boolean) {
+    if (close) {
+        copyToImpl(dst, Long.MAX_VALUE)
+        dst.flush()
+    } else {
+        copyToImpl(dst, Long.MAX_VALUE)
+        dst.close()
+    }
+}
+
 /**
  * Reads up to [limit] bytes from receiver channel and writes them to [dst] channel.
  * Closes [dst] channel if fails to read or write with cause exception.
@@ -147,7 +167,7 @@
     require(limit >= 0L)
 
     if (this is ByteBufferChannel && dst is ByteBufferChannel) {
-        return dst.copyDirect(this, limit)
+        return dst.copyDirect(this, limit, false)
     }
 
     return copyToImpl(dst, limit)
diff --git a/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelScenarioTest.kt b/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelScenarioTest.kt
index 910408e..e383197 100644
--- a/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelScenarioTest.kt
+++ b/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelScenarioTest.kt
@@ -5,7 +5,7 @@
 import kotlinx.coroutines.experimental.launch
 import kotlinx.coroutines.experimental.runBlocking
 import kotlinx.coroutines.experimental.yield
-import org.junit.Test
+import org.junit.*
 import java.io.IOException
 import kotlin.test.assertEquals
 import kotlin.test.fail
@@ -13,6 +13,11 @@
 class ByteBufferChannelScenarioTest : TestBase() {
     private val ch = ByteBufferChannel(true)
 
+    @After
+    fun finish() {
+        ch.close(InterruptedException())
+    }
+
     @Test
     fun testReadBeforeAvailable() {
         expect(1)
diff --git a/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelTest.kt b/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelTest.kt
index 09591c2..99f191e 100644
--- a/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelTest.kt
+++ b/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelTest.kt
@@ -7,8 +7,7 @@
 import kotlinx.coroutines.experimental.io.packet.ByteReadPacket
 import kotlinx.io.core.*
 import kotlinx.io.pool.*
-import org.junit.Rule
-import org.junit.Test
+import org.junit.*
 import org.junit.rules.ErrorCollector
 import org.junit.rules.Timeout
 import java.nio.CharBuffer
@@ -40,6 +39,11 @@
     private val Size = BUFFER_SIZE - RESERVED_SIZE
     private val ch = ByteBufferChannel(autoFlush = false, pool = pool)
 
+    @After
+    fun finish() {
+        ch.close(InterruptedException())
+    }
+
     @Test
     fun testBoolean() {
         runBlocking {
@@ -647,7 +651,8 @@
         val dest = ByteBufferChannel(true, pool)
 
         launch {
-            ch.copyAndClose(dest)
+//            ch.copyAndClose(dest)
+           ch.joinTo(dest, true)
         }
 
         val reader = launch {
@@ -671,6 +676,8 @@
 
         runBlocking {
             reader.join()
+            dest.close()
+            ch.close()
         }
     }
 
@@ -721,6 +728,30 @@
         }
     }
 
+    @Test
+    fun testJoinToSmokeTest() = runBlocking {
+        val other = ByteBufferChannel(autoFlush = false, pool = pool)
+        ch.joinTo(other, false)
+
+        ch.writeInt(0x11223344)
+        ch.flush()
+        assertEquals(0x11223344, other.readInt())
+    }
+
+    @Test
+    fun testJoinToAfterWrite() = runBlocking {
+        val other = ByteBufferChannel(autoFlush = false, pool = pool)
+
+        ch.writeInt(0x12345678)
+        ch.joinTo(other, false)
+
+        ch.writeInt(0x11223344)
+        ch.flush()
+
+        assertEquals(0x12345678, other.readInt())
+        assertEquals(0x11223344, other.readInt())
+    }
+
     private inline fun buildPacket(block: ByteWritePacket.() -> Unit): ByteReadPacket {
         val builder = BytePacketBuilder(0, pktPool)
         try {