IO: introduce ByteChannel.joinTo
diff --git a/core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt b/core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt
index b2d0ce9..f82aec8 100644
--- a/core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt
+++ b/core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt
@@ -38,6 +38,10 @@
private var closed: ClosedElement? = null
@Volatile
+ private var delegatedTo: ByteBufferChannel? = null
+ private var delegateClose: Boolean = false
+
+ @Volatile
private var readOp: CancellableContinuation<Boolean>? = null
@Volatile
@@ -77,10 +81,17 @@
state.capacity.flush()
if (state.capacity.isEmpty() || cause != null) tryTerminate()
resumeClosed(cause)
+
+ if (state === ReadWriteBufferState.Terminated) {
+ delegatedTo?.close(cause)
+ }
+
return true
}
override fun flush() {
+ delegatedTo?.let { delegated -> delegated.flush(); return }
+
if (!state.capacity.flush()) return
resumeReadOp()
if (availableForWrite > 0) resumeWriteOp()
@@ -98,20 +109,36 @@
position(position)
}
- private fun setupStateForWrite(): ByteBuffer {
+ private fun setupStateForWrite(): ByteBuffer? {
var _allocated: ReadWriteBufferState.Initial? = null
val (old, newState) = updateState { state ->
when (state) {
ReadWriteBufferState.IdleEmpty -> {
- val allocated = _allocated ?: newBuffer().also { _allocated = it }
- allocated.startWriting()
+ if (delegatedTo != null) {
+ ReadWriteBufferState.Terminated
+ } else {
+ val allocated = _allocated ?: newBuffer().also { _allocated = it }
+ allocated.startWriting()
+ }
}
- ReadWriteBufferState.Terminated -> throw closed!!.sendException
+ ReadWriteBufferState.Terminated -> {
+ if (delegatedTo != null) {
+ return null
+ }
+
+ throw closed!!.sendException
+ }
else -> {
state.startWriting()
}
}
}
+
+ if (delegatedTo != null && state === ReadWriteBufferState.Terminated) {
+ ReadOp.getAndSet(this, null)?.resume(false)
+ return null
+ }
+
val buffer = newState.writeBuffer
_allocated?.let { allocated ->
@@ -145,7 +172,7 @@
private fun setupStateForRead(): ByteBuffer? {
val (_, newState) = updateState { state ->
when (state) {
- ReadWriteBufferState.Terminated -> closed!!.cause?.let { throw it } ?: return null
+ ReadWriteBufferState.Terminated -> closed?.cause?.let { throw it } ?: return null
ReadWriteBufferState.IdleEmpty -> closed?.cause?.let { throw it } ?: return null
else -> {
if (state.capacity.availableForRead == 0) return null
@@ -189,18 +216,40 @@
}
}
- private fun tryTerminate() {
- val closed = closed ?: return
+ private fun setupDelegateTo(delegate: ByteBufferChannel, delegateClose: Boolean) {
+ require(this !== delegate)
+
+ this.delegateClose = delegateClose
+ this.delegatedTo = delegate
+ resumeWriteOp()
+
+ val alreadyClosed = closed
+ if (alreadyClosed != null) {
+ if (alreadyClosed.cause != null) delegate.close(alreadyClosed.cause)
+ else if (delegateClose) delegate.close()
+ else delegate.flush()
+ } else {
+ flush()
+ }
+ }
+
+ private fun tryTerminate(): Boolean {
+ val closed = closed
+ val delegate = delegatedTo
+
+ if (closed == null && delegate == null) return false
+
var toRelease: ReadWriteBufferState.Initial? = null
updateState { state ->
when {
+ state === ReadWriteBufferState.Terminated -> return true
state === ReadWriteBufferState.IdleEmpty -> ReadWriteBufferState.Terminated
- closed.cause != null && state is ReadWriteBufferState.IdleNonEmpty -> {
+ closed?.cause != null && state is ReadWriteBufferState.IdleNonEmpty -> {
toRelease = state.initial
ReadWriteBufferState.Terminated
}
- else -> return
+ else -> return false
}
}
@@ -210,24 +259,46 @@
}
}
- WriteOp.getAndSet(this, null)?.resumeWithException(closed.sendException)
- ReadOp.getAndSet(this, null)?.apply {
- if (closed.cause != null) resumeWithException(closed.cause) else resume(false)
+ if (closed != null) {
+ WriteOp.getAndSet(this, null)?.resumeWithException(closed.sendException)
+ ReadOp.getAndSet(this, null)?.apply {
+ if (closed.cause != null) resumeWithException(closed.cause) else resume(false)
+ }
+ } else if (delegate != null) {
+ WriteOp.getAndSet(this, null)?.resumeWithException(IllegalStateException("Delegation preparation is in progress"))
+ ReadOp.getAndSet(this, null)?.resume(false) // send EOF to delegator copy loop
}
+
+ return true
}
private fun ByteBuffer.carryIndex(idx: Int) = if (idx >= capacity() - reservedSize) idx - (capacity() - reservedSize) else idx
- private inline fun writing(block: ByteBuffer.(RingBufferCapacity) -> Unit) {
- val buffer = setupStateForWrite()
- val capacity = state.capacity
+ private inline fun writing(block: ByteBufferChannel.(ByteBuffer, RingBufferCapacity) -> Unit) {
+ val buffer: ByteBuffer
+ val capacity: RingBufferCapacity
+
+ var current = this
+ while (true) {
+ val bb = current.setupStateForWrite()
+ if (bb == null && current.state === ReadWriteBufferState.Terminated) {
+ current = current.delegatedTo!!
+ } else if (bb == null) {
+ return
+ } else {
+ buffer = bb
+ capacity = current.state.capacity
+ break
+ }
+ }
+
try {
- closed?.let { throw it.sendException }
- block(buffer, capacity)
+ current.closed?.let { throw it.sendException }
+ block(current, buffer, capacity)
} finally {
- if (capacity.isFull() || autoFlush) flush()
- restoreStateAfterWrite()
- tryTerminate()
+ if (capacity.isFull() || current.autoFlush) current.flush()
+ current.restoreStateAfterWrite()
+ current.tryTerminate()
}
}
@@ -677,8 +748,20 @@
resumeWriteOp()
}
+ private suspend fun <R> delegation(delegate: ByteBufferChannel, block: suspend (ByteBufferChannel) -> R): R {
+ restoreStateAfterWrite()
+
+ while (true) {
+ if (tryTerminate()) {
+ return block(delegate)
+ } else {
+ writeSuspend(Int.MAX_VALUE) // wait for delegator loop complete
+ }
+ }
+ }
+
suspend override fun writeByte(b: Byte) {
- val buffer = setupStateForWrite()
+ val buffer = setupStateForWrite() ?: return delegatedTo!!.writeByte(b)
val c = state.capacity
return tryWriteByte(buffer, b, c)
@@ -708,12 +791,16 @@
throw t
}
+ delegatedTo?.let { delegate ->
+ return delegation(delegate) { it.writeByte(b) }
+ }
+
buffer.prepareBuffer(writeByteOrder, writePosition, c.availableForWrite)
- tryWriteByte(buffer, b, c)
+ return tryWriteByte(buffer, b, c)
}
suspend override fun writeShort(s: Short) {
- val buffer = setupStateForWrite()
+ val buffer = setupStateForWrite() ?: return delegatedTo!!.writeShort(s)
val c = state.capacity
return tryWriteShort(buffer, s, c)
@@ -753,8 +840,12 @@
throw t
}
+ delegatedTo?.let { delegate ->
+ return delegation(delegate) { it.writeShort(s) }
+ }
+
buffer.prepareBuffer(writeByteOrder, writePosition, c.availableForWrite)
- tryWriteShort(buffer, s, c)
+ return tryWriteShort(buffer, s, c)
}
private fun ByteBuffer.tryWriteInt(i: Int, c: RingBufferCapacity): Boolean {
@@ -778,7 +869,7 @@
}
suspend override fun writeInt(i: Int) {
- val buffer = setupStateForWrite()
+ val buffer = setupStateForWrite() ?: return delegatedTo!!.writeInt(i)
val c = state.capacity
if (!buffer.tryWriteInt(i, c)) {
@@ -795,9 +886,13 @@
throw t
}
+ delegatedTo?.let { delegate ->
+ return delegation(delegate) { it.writeInt(i) }
+ }
+
prepareBuffer(writeByteOrder, writePosition, c.availableForWrite)
if (!tryWriteInt(i, c)) {
- writeIntSuspend(i, c)
+ return writeIntSuspend(i, c)
}
}
@@ -822,7 +917,7 @@
}
suspend override fun writeLong(l: Long) {
- val buffer = setupStateForWrite()
+ val buffer = setupStateForWrite() ?: return delegatedTo!!.writeLong(l)
val c = state.capacity
if (!buffer.tryWriteLong(l, c)) {
@@ -831,11 +926,21 @@
}
private tailrec suspend fun ByteBuffer.writeLongSuspend(l: Long, c: RingBufferCapacity) {
- writeSuspend(8)
+ try {
+ writeSuspend(8)
+ } catch (t: Throwable) {
+ restoreStateAfterWrite()
+ tryTerminate()
+ throw t
+ }
+
+ delegatedTo?.let { delegate ->
+ return delegation(delegate) { it.writeLong(l) }
+ }
prepareBuffer(writeByteOrder, writePosition, c.availableForWrite)
if (!tryWriteLong(l, c)) {
- writeLongSuspend(l, c)
+ return writeLongSuspend(l, c)
}
}
@@ -863,7 +968,12 @@
private suspend fun writeAvailableSuspend(src: ByteBuffer): Int {
while (true) {
- writeSuspend(1)
+ writeSuspend(1) // here we don't need to restoreStateAfterWrite as write copy loop doesn't hold state
+
+ delegatedTo?.let { delegate ->
+ return delegation(delegate) { it.writeAvailable(src) }
+ }
+
val copied = writeAvailable(src)
if (copied > 0) return copied
}
@@ -872,6 +982,11 @@
private suspend fun writeAvailableSuspend(src: BufferView): Int {
while (true) {
writeSuspend(1)
+
+ delegatedTo?.let { delegate ->
+ return delegation(delegate) { it.writeAvailable(src) }
+ }
+
val copied = writeAvailable(src)
if (copied > 0) return copied
}
@@ -894,6 +1009,11 @@
private suspend fun writeFullySuspend(src: ByteBuffer) {
while (src.hasRemaining()) {
writeSuspend(1)
+
+ delegatedTo?.let { delegate ->
+ return delegation(delegate) { it.writeFully(src) }
+ }
+
writeAsMuchAsPossible(src)
}
}
@@ -901,28 +1021,43 @@
private suspend fun writeFullySuspend(src: BufferView) {
while (src.canRead()) {
writeSuspend(1)
+
+ delegatedTo?.let { delegate ->
+ return delegation(delegate) { it.writeFully(src) }
+ }
+
writeAsMuchAsPossible(src)
}
}
- internal suspend fun copyDirect(src: ByteBufferChannel, limit: Long = Long.MAX_VALUE): Long {
+ internal suspend fun joinFrom(src: ByteBufferChannel, delegateClose: Boolean) {
+ src.setupDelegateTo(this, delegateClose)
+ copyDirect(src, Long.MAX_VALUE, leaveOnDelegation = true)
+
+ if (delegateClose && src.isClosedForRead) {
+ close()
+ } else {
+ flush()
+ }
+ }
+
+ internal suspend fun copyDirect(src: ByteBufferChannel, limit: Long, leaveOnDelegation: Boolean): Long {
if (limit == 0L || src.isClosedForRead) return 0L
val autoFlush = autoFlush
val byteOrder = writeByteOrder
try {
var copied = 0L
- writing { state ->
- val dstBuffer = this
-
+ writing { dstBuffer, state ->
while (copied < limit) {
var avWBefore = state.availableForWrite
if (avWBefore == 0) {
writeSuspend(1)
+ if (delegatedTo != null) break
avWBefore = state.availableForWrite
}
- prepareBuffer(byteOrder, writePosition, avWBefore)
+ dstBuffer.prepareBuffer(byteOrder, writePosition, avWBefore)
var partSize = 0
@@ -948,7 +1083,7 @@
}
if (rc) {
- bytesWritten(state, partSize)
+ dstBuffer.bytesWritten(state, partSize)
copied += partSize
if (avWBefore - partSize == 0 || autoFlush) {
@@ -958,6 +1093,15 @@
if (src.isClosedForRead) break
flush()
+
+ if (leaveOnDelegation) {
+ if (src.tryShutdownForDelegation()) break
+ else if (src.state.capacity.flush()) {
+ src.resumeWriteOp()
+ continue
+ }
+ }
+
if (!src.readSuspend(1)) break
}
}
@@ -967,6 +1111,10 @@
flush()
}
+ delegatedTo?.let { delegate ->
+ return copied + delegate.copyDirect(src, limit - copied, leaveOnDelegation)
+ }
+
return copied
} catch (t: Throwable) {
close(t)
@@ -974,29 +1122,41 @@
}
}
+ private fun tryShutdownForDelegation(): Boolean {
+ if (delegatedTo == null) return false
+
+ updateState { before ->
+ if (before === ReadWriteBufferState.Terminated) return true
+ if (before === ReadWriteBufferState.IdleEmpty) ReadWriteBufferState.Terminated
+ else return false
+ }
+
+ return true
+ }
+
private fun writeAsMuchAsPossible(src: ByteBuffer): Int {
- writing {
+ writing { dst, state ->
var written = 0
val srcLimit = src.limit()
do {
val srcRemaining = srcLimit - src.position()
if (srcRemaining == 0) break
- val possibleSize = it.tryWriteAtMost(minOf(srcRemaining, remaining()))
+ val possibleSize = state.tryWriteAtMost(minOf(srcRemaining, dst.remaining()))
if (possibleSize == 0) break
require(possibleSize > 0)
src.limit(src.position() + possibleSize)
- put(src)
+ dst.put(src)
written += possibleSize
- prepareBuffer(writeByteOrder, carryIndex(writePosition + written), it.availableForWrite)
+ dst.prepareBuffer(writeByteOrder, dst.carryIndex(writePosition + written), state.availableForWrite)
} while (true)
src.limit(srcLimit)
- bytesWritten(it, written)
+ dst.bytesWritten(state, written)
return written
}
@@ -1005,22 +1165,22 @@
}
private fun writeAsMuchAsPossible(src: BufferView): Int {
- writing {
+ writing { dst, state ->
var written = 0
do {
val srcSize = src.readRemaining
- val possibleSize = it.tryWriteAtMost(minOf(srcSize, remaining()))
+ val possibleSize = state.tryWriteAtMost(minOf(srcSize, dst.remaining()))
if (possibleSize == 0) break
- src.read(this, possibleSize)
+ src.read(dst, possibleSize)
written += possibleSize
- prepareBuffer(writeByteOrder, carryIndex(writePosition + written), it.availableForWrite)
+ dst.prepareBuffer(writeByteOrder, dst.carryIndex(writePosition + written), state.availableForWrite)
} while (true)
- bytesWritten(it, written)
+ dst.bytesWritten(state, written)
return written
}
@@ -1029,21 +1189,21 @@
}
private fun writeAsMuchAsPossible(src: ByteArray, offset: Int, length: Int): Int {
- writing {
+ writing { dst, state ->
var written = 0
do {
- val possibleSize = it.tryWriteAtMost(minOf(length - written, remaining()))
+ val possibleSize = state.tryWriteAtMost(minOf(length - written, dst.remaining()))
if (possibleSize == 0) break
require(possibleSize > 0)
- put(src, offset + written, possibleSize)
+ dst.put(src, offset + written, possibleSize)
written += possibleSize
- prepareBuffer(writeByteOrder, carryIndex(writePosition + written), it.availableForWrite)
+ dst.prepareBuffer(writeByteOrder, dst.carryIndex(writePosition + written), state.availableForWrite)
} while (true)
- bytesWritten(it, written)
+ dst.bytesWritten(state, written)
return written
}
@@ -1083,6 +1243,11 @@
private suspend fun writeSuspend(src: ByteArray, offset: Int, length: Int): Int {
while (true) {
writeSuspend(1)
+
+ delegatedTo?.let { delegate ->
+ return delegation(delegate) { it.writeAvailable(src, offset, length) }
+ }
+
val size = writeAsMuchAsPossible(src, offset, length)
if (size > 0) return size
}
@@ -1093,17 +1258,17 @@
var written = false
- writing {
- if (it.availableForWrite >= min) {
- val position = this.position()
- val l = this.limit()
- block(this)
- if (l != this.limit()) throw IllegalStateException("buffer limit modified")
- val delta = position() - position
+ writing { dst, state ->
+ if (state.availableForWrite >= min) {
+ val position = dst.position()
+ val l = dst.limit()
+ block(dst)
+ if (l != dst.limit()) throw IllegalStateException("buffer limit modified")
+ val delta = dst.position() - position
if (delta < 0) throw IllegalStateException("position has been moved backward: pushback is not supported")
- if (!it.tryWriteExact(delta)) throw IllegalStateException()
- bytesWritten(it, delta)
+ if (!state.tryWriteExact(delta)) throw IllegalStateException()
+ dst.bytesWritten(state, delta)
written = true
}
}
@@ -1115,6 +1280,9 @@
private suspend fun writeBlockSuspend(min: Int, block: (ByteBuffer) -> Unit) {
writeSuspend(min)
+ delegatedTo?.let { delegate ->
+ return delegation(delegate) { it.write(min, block) }
+ }
write(min, block)
}
@@ -1166,6 +1334,9 @@
try {
while (!packet.isEmpty) {
writeSuspend(1)
+ delegatedTo?.let { delegate ->
+ return delegation(delegate) { it.writePacket(packet) }
+ }
tryWritePacketPart(packet)
}
} finally {
@@ -1175,12 +1346,12 @@
private fun tryWritePacketPart(packet: ByteReadPacket): Int {
var copied = 0
- writing {
- val size = it.tryWriteAtMost(minOf(packet.remaining, remaining()))
+ writing { dst, state ->
+ val size = state.tryWriteAtMost(minOf(packet.remaining, dst.remaining()))
if (size > 0) {
- limit(position() + size)
- packet.readFully(this)
- bytesWritten(it, size)
+ dst.limit(dst.position() + size)
+ packet.readFully(dst)
+ dst.bytesWritten(state, size)
}
copied = size
}
@@ -1548,15 +1719,15 @@
}
private suspend fun writeSuspend(size: Int) {
- while (state.capacity.availableForWrite < size && state !== ReadWriteBufferState.IdleEmpty) {
+ while (state.capacity.availableForWrite < size && state !== ReadWriteBufferState.IdleEmpty && delegatedTo == null) {
suspendCancellableCoroutine<Unit>(holdCancellability = true) { c ->
do {
closed?.sendException?.let { throw it }
- if (state.capacity.availableForWrite >= size || state === ReadWriteBufferState.IdleEmpty) {
+ if (state.capacity.availableForWrite >= size || state === ReadWriteBufferState.IdleEmpty || delegatedTo != null) {
c.resume(Unit)
break
}
- } while (!setContinuation({ writeOp }, WriteOp, c, { closed == null && state.capacity.availableForWrite < size && state !== ReadWriteBufferState.IdleEmpty }))
+ } while (!setContinuation({ writeOp }, WriteOp, c, { closed == null && state.capacity.availableForWrite < size && state !== ReadWriteBufferState.IdleEmpty && delegatedTo == null }))
flush()
}
diff --git a/core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteReadChannel.kt b/core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteReadChannel.kt
index 216ecdd..79c6b5e 100644
--- a/core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteReadChannel.kt
+++ b/core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteReadChannel.kt
@@ -137,6 +137,26 @@
suspend fun read(min: Int = 1, block: (ByteBuffer) -> Unit)
}
+suspend fun ByteReadChannel.joinTo(dst: ByteWriteChannel, closeOnEnd: Boolean) {
+ require(dst !== this)
+
+ if (this is ByteBufferChannel && dst is ByteBufferChannel) {
+ return dst.joinFrom(this, closeOnEnd)
+ }
+
+ return joinToImpl(dst, closeOnEnd)
+}
+
+private suspend fun ByteReadChannel.joinToImpl(dst: ByteWriteChannel, close: Boolean) {
+ if (close) {
+ copyToImpl(dst, Long.MAX_VALUE)
+ dst.flush()
+ } else {
+ copyToImpl(dst, Long.MAX_VALUE)
+ dst.close()
+ }
+}
+
/**
* Reads up to [limit] bytes from receiver channel and writes them to [dst] channel.
* Closes [dst] channel if fails to read or write with cause exception.
@@ -147,7 +167,7 @@
require(limit >= 0L)
if (this is ByteBufferChannel && dst is ByteBufferChannel) {
- return dst.copyDirect(this, limit)
+ return dst.copyDirect(this, limit, false)
}
return copyToImpl(dst, limit)
diff --git a/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelScenarioTest.kt b/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelScenarioTest.kt
index 910408e..e383197 100644
--- a/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelScenarioTest.kt
+++ b/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelScenarioTest.kt
@@ -5,7 +5,7 @@
import kotlinx.coroutines.experimental.launch
import kotlinx.coroutines.experimental.runBlocking
import kotlinx.coroutines.experimental.yield
-import org.junit.Test
+import org.junit.*
import java.io.IOException
import kotlin.test.assertEquals
import kotlin.test.fail
@@ -13,6 +13,11 @@
class ByteBufferChannelScenarioTest : TestBase() {
private val ch = ByteBufferChannel(true)
+ @After
+ fun finish() {
+ ch.close(InterruptedException())
+ }
+
@Test
fun testReadBeforeAvailable() {
expect(1)
diff --git a/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelTest.kt b/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelTest.kt
index 09591c2..99f191e 100644
--- a/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelTest.kt
+++ b/core/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannelTest.kt
@@ -7,8 +7,7 @@
import kotlinx.coroutines.experimental.io.packet.ByteReadPacket
import kotlinx.io.core.*
import kotlinx.io.pool.*
-import org.junit.Rule
-import org.junit.Test
+import org.junit.*
import org.junit.rules.ErrorCollector
import org.junit.rules.Timeout
import java.nio.CharBuffer
@@ -40,6 +39,11 @@
private val Size = BUFFER_SIZE - RESERVED_SIZE
private val ch = ByteBufferChannel(autoFlush = false, pool = pool)
+ @After
+ fun finish() {
+ ch.close(InterruptedException())
+ }
+
@Test
fun testBoolean() {
runBlocking {
@@ -647,7 +651,8 @@
val dest = ByteBufferChannel(true, pool)
launch {
- ch.copyAndClose(dest)
+// ch.copyAndClose(dest)
+ ch.joinTo(dest, true)
}
val reader = launch {
@@ -671,6 +676,8 @@
runBlocking {
reader.join()
+ dest.close()
+ ch.close()
}
}
@@ -721,6 +728,30 @@
}
}
+ @Test
+ fun testJoinToSmokeTest() = runBlocking {
+ val other = ByteBufferChannel(autoFlush = false, pool = pool)
+ ch.joinTo(other, false)
+
+ ch.writeInt(0x11223344)
+ ch.flush()
+ assertEquals(0x11223344, other.readInt())
+ }
+
+ @Test
+ fun testJoinToAfterWrite() = runBlocking {
+ val other = ByteBufferChannel(autoFlush = false, pool = pool)
+
+ ch.writeInt(0x12345678)
+ ch.joinTo(other, false)
+
+ ch.writeInt(0x11223344)
+ ch.flush()
+
+ assertEquals(0x12345678, other.readInt())
+ assertEquals(0x11223344, other.readInt())
+ }
+
private inline fun buildPacket(block: ByteWritePacket.() -> Unit): ByteReadPacket {
val builder = BytePacketBuilder(0, pktPool)
try {