| package kotlinx.coroutines.experimental.io |
| |
| import kotlinx.coroutines.experimental.* |
| import kotlinx.coroutines.experimental.CancellationException |
| import kotlinx.coroutines.experimental.channels.* |
| import kotlinx.coroutines.experimental.io.internal.* |
| import kotlinx.coroutines.experimental.io.packet.* |
| import kotlinx.coroutines.experimental.io.packet.ByteReadPacket |
| import kotlinx.io.core.* |
| import kotlinx.io.pool.* |
| import org.junit.* |
| import org.junit.Test |
| import org.junit.rules.* |
| import java.nio.CharBuffer |
| import java.util.* |
| import java.util.concurrent.* |
| import java.util.concurrent.atomic.* |
| import kotlin.coroutines.experimental.* |
| import kotlin.test.* |
| |
| class ByteBufferChannelTest : TestBase() { |
| @get:Rule |
| val timeout = Timeout(100L * stressTestMultiplier, TimeUnit.SECONDS) |
| |
| @get:Rule |
| private val failures = ErrorCollector() |
| |
| @get:Rule |
| internal val pool = VerifyingObjectPool(object : NoPoolImpl<ReadWriteBufferState.Initial>() { |
| override fun borrow(): ReadWriteBufferState.Initial { |
| return ReadWriteBufferState.Initial(java.nio.ByteBuffer.allocate(4096)) |
| } |
| }) |
| |
| @get:Rule |
| internal val pktPool = VerifyingObjectPool(BufferView.Pool) |
| |
| private val Size = BUFFER_SIZE - RESERVED_SIZE |
| private val ch = ByteBufferChannel(autoFlush = false, pool = pool) |
| |
| @After |
| fun finish() { |
| ch.close(InterruptedException()) |
| } |
| |
| @Test |
| fun testBoolean() { |
| runTest { |
| ch.writeBoolean(true) |
| ch.flush() |
| assertEquals(true, ch.readBoolean()) |
| |
| ch.writeBoolean(false) |
| ch.flush() |
| assertEquals(false, ch.readBoolean()) |
| } |
| } |
| |
| @Test |
| fun testByte() { |
| runTest { |
| assertEquals(0, ch.availableForRead) |
| ch.writeByte(-1) |
| ch.flush() |
| assertEquals(1, ch.availableForRead) |
| assertEquals(-1, ch.readByte()) |
| assertEquals(0, ch.availableForRead) |
| } |
| } |
| |
| @Test |
| fun testShortB() { |
| runTest { |
| ch.readByteOrder = ByteOrder.BIG_ENDIAN |
| ch.writeByteOrder = ByteOrder.BIG_ENDIAN |
| |
| assertEquals(0, ch.availableForRead) |
| ch.writeShort(-1) |
| assertEquals(0, ch.availableForRead) |
| ch.flush() |
| assertEquals(2, ch.availableForRead) |
| assertEquals(-1, ch.readShort()) |
| assertEquals(0, ch.availableForRead) |
| } |
| } |
| |
| @Test |
| fun testShortL() { |
| runTest { |
| ch.readByteOrder = ByteOrder.LITTLE_ENDIAN |
| ch.writeByteOrder = ByteOrder.LITTLE_ENDIAN |
| |
| assertEquals(0, ch.availableForRead) |
| ch.writeShort(-1) |
| assertEquals(0, ch.availableForRead) |
| ch.flush() |
| assertEquals(2, ch.availableForRead) |
| assertEquals(-1, ch.readShort()) |
| assertEquals(0, ch.availableForRead) |
| } |
| } |
| |
| @Test |
| fun testShortEdge() { |
| runTest { |
| ch.writeByte(1) |
| |
| for (i in 2 until Size step 2) { |
| ch.writeShort(0x00ee) |
| } |
| |
| ch.flush() |
| |
| ch.readByte() |
| ch.writeShort(0x1234) |
| |
| ch.flush() |
| |
| while (ch.availableForRead > 2) { |
| ch.readShort() |
| } |
| |
| assertEquals(0x1234, ch.readShort()) |
| } |
| } |
| |
| @Test |
| fun testIntB() { |
| runTest { |
| ch.readByteOrder = ByteOrder.BIG_ENDIAN |
| ch.writeByteOrder = ByteOrder.BIG_ENDIAN |
| |
| assertEquals(0, ch.availableForRead) |
| ch.writeInt(-1) |
| ch.flush() |
| assertEquals(4, ch.availableForRead) |
| assertEquals(-1, ch.readInt()) |
| assertEquals(0, ch.availableForRead) |
| } |
| } |
| |
| @Test |
| fun testIntL() { |
| runTest { |
| ch.readByteOrder = ByteOrder.LITTLE_ENDIAN |
| ch.writeByteOrder = ByteOrder.LITTLE_ENDIAN |
| |
| assertEquals(0, ch.availableForRead) |
| ch.writeInt(-1) |
| ch.flush() |
| assertEquals(4, ch.availableForRead) |
| assertEquals(-1, ch.readInt()) |
| assertEquals(0, ch.availableForRead) |
| } |
| } |
| |
| @Test |
| fun testIntEdge() { |
| runTest { |
| for (shift in 1..3) { |
| for (i in 1..shift) { |
| ch.writeByte(1) |
| } |
| |
| repeat(Size / 4 - 1) { |
| ch.writeInt(0xeeeeeeeeL) |
| } |
| |
| ch.flush() |
| |
| for (i in 1..shift) { |
| ch.readByte() |
| } |
| |
| ch.writeInt(0x12345678) |
| |
| ch.flush() |
| |
| while (ch.availableForRead > 4) { |
| ch.readInt() |
| } |
| |
| assertEquals(0x12345678, ch.readInt()) |
| } |
| } |
| } |
| |
| @Test |
| fun testIntEdge2() { |
| runTest { |
| for (shift in 1..3) { |
| for (i in 1..shift) { |
| ch.writeByte(1) |
| } |
| |
| repeat(Size / 4 - 1) { |
| ch.writeInt(0xeeeeeeeeL) |
| } |
| |
| ch.flush() |
| |
| for (i in 1..shift) { |
| ch.readByte() |
| } |
| |
| ch.writeByte(0x12) |
| ch.writeByte(0x34) |
| ch.writeByte(0x56) |
| ch.writeByte(0x78) |
| |
| ch.flush() |
| |
| while (ch.availableForRead > 4) { |
| ch.readInt() |
| } |
| |
| assertEquals(0x12345678, ch.readInt()) |
| } |
| } |
| } |
| |
| |
| @Test |
| fun testLongB() { |
| runTest { |
| ch.readByteOrder = ByteOrder.BIG_ENDIAN |
| ch.writeByteOrder = ByteOrder.BIG_ENDIAN |
| |
| assertEquals(0, ch.availableForRead) |
| ch.writeLong(Long.MIN_VALUE) |
| ch.flush() |
| assertEquals(8, ch.availableForRead) |
| assertEquals(Long.MIN_VALUE, ch.readLong()) |
| assertEquals(0, ch.availableForRead) |
| } |
| } |
| |
| @Test |
| fun testLongL() { |
| runTest { |
| ch.readByteOrder = ByteOrder.LITTLE_ENDIAN |
| ch.writeByteOrder = ByteOrder.LITTLE_ENDIAN |
| |
| assertEquals(0, ch.availableForRead) |
| ch.writeLong(Long.MIN_VALUE) |
| ch.flush() |
| assertEquals(8, ch.availableForRead) |
| assertEquals(Long.MIN_VALUE, ch.readLong()) |
| assertEquals(0, ch.availableForRead) |
| } |
| } |
| |
| @Test |
| fun testLongEdge() { |
| runTest { |
| for (shift in 1..7) { |
| for (i in 1..shift) { |
| ch.writeByte(1) |
| } |
| |
| repeat(Size / 8 - 1) { |
| ch.writeLong(0x11112222eeeeeeeeL) |
| } |
| |
| ch.flush() |
| for (i in 1..shift) { |
| ch.readByte() |
| } |
| |
| ch.writeLong(0x1234567812345678L) |
| ch.flush() |
| |
| while (ch.availableForRead > 8) { |
| ch.readLong() |
| } |
| |
| assertEquals(0x1234567812345678L, ch.readLong()) |
| } |
| } |
| } |
| |
| @Test |
| fun testDoubleB() { |
| runTest { |
| ch.readByteOrder = ByteOrder.BIG_ENDIAN |
| ch.writeByteOrder = ByteOrder.BIG_ENDIAN |
| |
| assertEquals(0, ch.availableForRead) |
| ch.writeDouble(1.05) |
| ch.flush() |
| |
| assertEquals(8, ch.availableForRead) |
| assertEquals(1.05, ch.readDouble()) |
| assertEquals(0, ch.availableForRead) |
| } |
| } |
| |
| @Test |
| fun testDoubleL() { |
| runTest { |
| ch.readByteOrder = ByteOrder.LITTLE_ENDIAN |
| ch.writeByteOrder = ByteOrder.LITTLE_ENDIAN |
| |
| assertEquals(0, ch.availableForRead) |
| ch.writeDouble(1.05) |
| ch.flush() |
| |
| assertEquals(8, ch.availableForRead) |
| assertEquals(1.05, ch.readDouble()) |
| assertEquals(0, ch.availableForRead) |
| } |
| } |
| |
| @Test |
| fun testFloatB() { |
| runTest { |
| ch.readByteOrder = ByteOrder.BIG_ENDIAN |
| ch.writeByteOrder = ByteOrder.BIG_ENDIAN |
| |
| assertEquals(0, ch.availableForRead) |
| ch.writeFloat(1.05f) |
| ch.flush() |
| |
| assertEquals(4, ch.availableForRead) |
| assertEquals(1.05f, ch.readFloat()) |
| assertEquals(0, ch.availableForRead) |
| } |
| } |
| |
| @Test |
| fun testFloatL() { |
| runTest { |
| ch.readByteOrder = ByteOrder.LITTLE_ENDIAN |
| ch.writeByteOrder = ByteOrder.LITTLE_ENDIAN |
| |
| assertEquals(0, ch.availableForRead) |
| ch.writeFloat(1.05f) |
| ch.flush() |
| |
| assertEquals(4, ch.availableForRead) |
| assertEquals(1.05f, ch.readFloat()) |
| assertEquals(0, ch.availableForRead) |
| } |
| } |
| |
| |
| |
| @Test |
| fun testEndianMix() { |
| val byteOrders = listOf(ByteOrder.BIG_ENDIAN, ByteOrder.LITTLE_ENDIAN) |
| runTest { |
| for (writeOrder in byteOrders) { |
| ch.writeByteOrder = writeOrder |
| |
| for (readOrder in byteOrders) { |
| ch.readByteOrder = readOrder |
| |
| assertEquals(0, ch.availableForRead) |
| ch.writeShort(0x001f) |
| ch.flush() |
| if (writeOrder == readOrder) |
| assertEquals(0x001f, ch.readShort()) |
| else |
| assertEquals(0x1f00, ch.readShort()) |
| |
| assertEquals(0, ch.availableForRead) |
| ch.writeShort(0x001f) |
| ch.flush() |
| if (writeOrder == readOrder) |
| assertEquals(0x001f, ch.readShort()) |
| else |
| assertEquals(0x1f00, ch.readShort()) |
| |
| assertEquals(0, ch.availableForRead) |
| ch.writeInt(0x1f) |
| ch.flush() |
| if (writeOrder == readOrder) |
| assertEquals(0x0000001f, ch.readInt()) |
| else |
| assertEquals(0x1f000000, ch.readInt()) |
| |
| assertEquals(0, ch.availableForRead) |
| ch.writeInt(0x1fL) |
| ch.flush() |
| if (writeOrder == readOrder) |
| assertEquals(0x0000001f, ch.readInt()) |
| else |
| assertEquals(0x1f000000, ch.readInt()) |
| |
| assertEquals(0, ch.availableForRead) |
| ch.writeLong(0x1f) |
| ch.flush() |
| if (writeOrder == readOrder) |
| assertEquals(0x1f, ch.readLong()) |
| else |
| assertEquals(0x1f00000000000000L, ch.readLong()) |
| } |
| } |
| } |
| } |
| |
| @Test |
| fun testClose() { |
| runTest { |
| ch.writeByte(1) |
| ch.writeByte(2) |
| ch.writeByte(3) |
| |
| ch.flush() |
| assertEquals(1, ch.readByte()) |
| ch.close() |
| |
| assertEquals(2, ch.readByte()) |
| assertEquals(3, ch.readByte()) |
| |
| try { |
| ch.readByte() |
| fail() |
| } catch (expected: ClosedReceiveChannelException) { |
| } |
| } |
| } |
| |
| @Test |
| fun testReadAndWriteFully() { |
| runTest { |
| val bytes = byteArrayOf(1, 2, 3, 4, 5) |
| val dst = ByteArray(5) |
| |
| ch.writeFully(bytes) |
| ch.flush() |
| assertEquals(5, ch.availableForRead) |
| ch.readFully(dst) |
| assertTrue { dst.contentEquals(bytes) } |
| |
| ch.writeFully(bytes) |
| ch.flush() |
| |
| val dst2 = ByteArray(4) |
| ch.readFully(dst2) |
| |
| assertEquals(1, ch.availableForRead) |
| assertEquals(5, ch.readByte()) |
| |
| ch.close() |
| |
| try { |
| ch.readFully(dst) |
| fail("") |
| } catch (expected: ClosedReceiveChannelException) { |
| } |
| } |
| } |
| |
| @Test |
| fun testReadAndWriteFullyByteBuffer() { |
| runTest { |
| val bytes = byteArrayOf(1, 2, 3, 4, 5) |
| val dst = ByteArray(5) |
| |
| ch.writeFully(ByteBuffer.wrap(bytes)) |
| ch.flush() |
| assertEquals(5, ch.availableForRead) |
| ch.readFully(ByteBuffer.wrap(dst)) |
| assertTrue { dst.contentEquals(bytes) } |
| |
| ch.writeFully(ByteBuffer.wrap(bytes)) |
| ch.flush() |
| |
| val dst2 = ByteArray(4) |
| ch.readFully(ByteBuffer.wrap(dst2)) |
| |
| assertEquals(1, ch.availableForRead) |
| assertEquals(5, ch.readByte()) |
| |
| ch.close() |
| |
| try { |
| ch.readFully(ByteBuffer.wrap(dst)) |
| fail("") |
| } catch (expected: ClosedReceiveChannelException) { |
| } |
| } |
| } |
| |
| @Test |
| fun testReadAndWritePartially() { |
| runTest { |
| val bytes = byteArrayOf(1, 2, 3, 4, 5) |
| |
| assertEquals(5, ch.writeAvailable(bytes)) |
| ch.flush() |
| assertEquals(5, ch.readAvailable(ByteArray(100))) |
| |
| repeat(Size / bytes.size) { |
| assertNotEquals(0, ch.writeAvailable(bytes)) |
| ch.flush() |
| } |
| |
| ch.readAvailable(ByteArray(ch.availableForRead - 1)) |
| assertEquals(1, ch.readAvailable(ByteArray(100))) |
| |
| ch.close() |
| } |
| } |
| |
| @Test |
| fun testReadAndWritePartiallyByteBuffer() { |
| runTest { |
| val bytes = byteArrayOf(1, 2, 3, 4, 5) |
| |
| assertEquals(5, ch.writeAvailable(ByteBuffer.wrap(bytes))) |
| ch.flush() |
| assertEquals(5, ch.readAvailable(ByteBuffer.allocate(100))) |
| |
| repeat(Size / bytes.size) { |
| assertNotEquals(0, ch.writeAvailable(ByteBuffer.wrap(bytes))) |
| ch.flush() |
| } |
| |
| ch.readAvailable(ByteArray(ch.availableForRead - 1)) |
| assertEquals(1, ch.readAvailable(ByteBuffer.allocate(100))) |
| |
| ch.close() |
| } |
| } |
| |
| |
| @Test |
| fun testReadAndWriteBig() { |
| val count = 200 |
| val bytes = ByteArray(65536) |
| Random().nextBytes(bytes) |
| |
| launch(CommonPool + CoroutineName("writer")) { |
| for (i in 1..count) { |
| ch.writeFully(bytes) |
| ch.flush() |
| } |
| }.invokeOnCompletion { t -> |
| if (t != null) { |
| failures.addError(t) |
| } |
| } |
| |
| runTest { |
| val dst = ByteArray(bytes.size) |
| for (i in 1..count) { |
| ch.readFully(dst) |
| assertTrue { dst.contentEquals(bytes) } |
| dst.fill(0) |
| } |
| } |
| } |
| |
| @Test |
| fun testReadAndWriteBigByteBuffer() { |
| val count = 200 |
| val bytes = ByteArray(65536) |
| Random().nextBytes(bytes) |
| |
| launch(CommonPool + CoroutineName("writer")) { |
| for (i in 1..count) { |
| ch.writeFully(ByteBuffer.wrap(bytes)) |
| ch.flush() |
| } |
| }.invokeOnCompletion { t -> |
| if (t != null) { |
| failures.addError(t) |
| } |
| } |
| |
| runTest { |
| val dst = ByteArray(bytes.size) |
| for (i in 1..count) { |
| ch.readFully(ByteBuffer.wrap(dst)) |
| assertTrue { dst.contentEquals(bytes) } |
| dst.fill(0) |
| } |
| } |
| } |
| |
| @Test |
| fun testPacket() = runTest { |
| val packet = buildPacket { |
| writeInt(0xffee) |
| writeStringUtf8("Hello") |
| } |
| |
| ch.writeInt(packet.remaining) |
| ch.writePacket(packet) |
| |
| ch.flush() |
| |
| val size = ch.readInt() |
| val readed = ch.readPacket(size) |
| |
| assertEquals(0xffee, readed.readInt()) |
| assertEquals("Hello", readed.readUTF8Line()) |
| } |
| |
| @Test |
| fun testBigPacket() = runTest { |
| launch(CommonPool + CoroutineName("writer")) { |
| val packet = buildPacket { |
| writeInt(0xffee) |
| writeStringUtf8(".".repeat(8192)) |
| } |
| |
| ch.writeInt(packet.remaining) |
| ch.writePacket(packet) |
| |
| ch.flush() |
| } |
| |
| val size = ch.readInt() |
| val readed = ch.readPacket(size) |
| |
| assertEquals(0xffee, readed.readInt()) |
| assertEquals(".".repeat(8192), readed.readUTF8Line()) |
| } |
| |
| @Test |
| fun testWriteString() = runTest { |
| ch.writeStringUtf8("abc") |
| ch.close() |
| |
| assertEquals("abc", ch.readASCIILine()) |
| } |
| |
| @Test |
| fun testWriteCharSequence() = runTest { |
| ch.writeStringUtf8("abc" as CharSequence) |
| ch.close() |
| |
| assertEquals("abc", ch.readASCIILine()) |
| } |
| |
| @Test |
| fun testWriteCharBuffer() = runTest { |
| val cb = CharBuffer.allocate(6) |
| |
| for (i in 0 until cb.remaining()) { |
| cb.put(i, ' ') |
| } |
| |
| cb.position(2) |
| cb.put(2, 'a') |
| cb.put(3, 'b') |
| cb.put(4, 'c') |
| cb.limit(5) |
| |
| assertEquals("abc", cb.slice().toString()) |
| |
| ch.writeStringUtf8(cb) |
| ch.close() |
| |
| assertEquals("abc", ch.readASCIILine()) |
| } |
| |
| @Test |
| fun testReadAndWriteLarge() = runTest { |
| val count = 128L * 1024 * stressTestMultiplier // * 8192 -> 1G * M |
| val data = ByteBuffer.allocate(8192)!! |
| Random().nextBytes(data.array()) |
| |
| launch("writer") { |
| repeat(count.toInt()) { |
| data.clear() |
| ch.writeFully(data) |
| } |
| ch.close() |
| } |
| |
| launch("reader") { |
| val buffer = ByteBuffer.allocate(8192)!! |
| var read = 0L |
| val total = count * 8192 |
| |
| while (read < total) { |
| buffer.clear() |
| val rc = ch.readFully(buffer) |
| if (rc == -1) break |
| read += rc |
| } |
| |
| assertEquals(total, read) |
| |
| buffer.clear() |
| assertEquals(-1, ch.readAvailable(buffer)) |
| } |
| } |
| |
| @Test |
| fun testReadAndWriteLargeViaLookAheadSession() = runTest { |
| val count = 128L * 1024 * stressTestMultiplier // * 8192 -> 1G * M |
| val data = ByteBuffer.allocate(8192)!! |
| Random().nextBytes(data.array()) |
| |
| launch("writer") { |
| repeat(count.toInt()) { |
| data.clear() |
| ch.writeFully(data) |
| } |
| ch.close() |
| } |
| |
| launch("reader") { |
| var read = 0L |
| val total = count * 8192 |
| |
| ch.lookAheadSuspend { |
| while (read < total) { |
| val bb = request(0, 1) |
| if (bb == null) { |
| if (!awaitAtLeast(1)) break |
| continue |
| } |
| val rc = bb.remaining() |
| bb.position(bb.limit()) |
| read += rc |
| consumed(rc) |
| } |
| } |
| |
| assertEquals(total, read) |
| assertEquals(-1, ch.readAvailable(ByteBuffer.allocate(8192))) |
| } |
| } |
| |
| @Test |
| fun testCopyLarge() { |
| val count = 100 * 256 * stressTestMultiplier // * 8192 |
| |
| launch { |
| val bb = ByteBuffer.allocate(8192) |
| for (i in 0 until bb.capacity()) { |
| bb.put((i and 0xff).toByte()) |
| } |
| |
| for (i in 1..count) { |
| bb.clear() |
| val split = i and 0x1fff |
| |
| bb.limit(split) |
| ch.writeFully(bb) |
| yield() |
| bb.limit(bb.capacity()) |
| ch.writeFully(bb) |
| } |
| |
| ch.close() |
| } |
| |
| val dest = ByteBufferChannel(true, pool) |
| |
| val joinerJob = launch { |
| ch.copyAndClose(dest) |
| } |
| |
| val reader = launch { |
| val bb = ByteBuffer.allocate(8192) |
| |
| for (i in 1..count) { |
| bb.clear() |
| dest.readFully(bb) |
| bb.flip() |
| |
| if (i and 0x1fff == 0) { |
| for (idx in 0 until bb.capacity()) { |
| assertEquals((idx and 0xff).toByte(), bb.get()) |
| } |
| } |
| } |
| |
| yield() |
| assertTrue(dest.isClosedForRead) |
| } |
| |
| runTest { |
| reader.join() |
| joinerJob.join() |
| dest.close() |
| ch.close() |
| } |
| } |
| |
| @Test |
| fun testJoinToLarge() { |
| val count = 100 * 256 * stressTestMultiplier // * 8192 |
| |
| val writerJob = launch { |
| val bb = ByteBuffer.allocate(8192) |
| for (i in 0 until bb.capacity()) { |
| bb.put((i and 0xff).toByte()) |
| } |
| |
| for (i in 1..count) { |
| bb.clear() |
| val split = i and 0x1fff |
| |
| bb.limit(split) |
| ch.writeFully(bb) |
| yield() |
| bb.limit(bb.capacity()) |
| ch.writeFully(bb) |
| } |
| |
| ch.close() |
| } |
| |
| val dest = ByteBufferChannel(true, pool) |
| |
| val joinerJob = launch { |
| ch.joinTo(dest, true) |
| } |
| |
| val reader = launch { |
| val bb = ByteBuffer.allocate(8192) |
| |
| for (i in 1..count) { |
| bb.clear() |
| dest.readFully(bb) |
| bb.flip() |
| |
| if (i and 0x1fff == 0) { |
| for (idx in 0 until bb.capacity()) { |
| assertEquals((idx and 0xff).toByte(), bb.get()) |
| } |
| } |
| } |
| |
| bb.clear() |
| assertEquals(-1, dest.readAvailable(bb)) |
| assertTrue(dest.isClosedForRead) |
| } |
| |
| val latch = CountDownLatch(1) |
| val r = AtomicInteger(3) |
| |
| val handler: CompletionHandler = { t -> |
| t?.let { failures.addError(it); latch.countDown() } |
| if (r.decrementAndGet() == 0) latch.countDown() |
| } |
| |
| reader.invokeOnCompletion(onCancelling = true, handler = handler) |
| writerJob.invokeOnCompletion(onCancelling = true, handler = handler) |
| joinerJob.invokeOnCompletion(onCancelling = true, handler = handler) |
| |
| latch.await() |
| } |
| |
| private suspend fun launch(name: String = "child", block: suspend () -> Unit): Job { |
| return launch(context = DefaultDispatcher + CoroutineName(name), parent = coroutineContext[Job]) { |
| block() |
| }.apply { |
| invokeOnCompletion( onCancelling = true) { t -> |
| if (t != null) ch.cancel(t) |
| } |
| } |
| } |
| |
| private fun launch(block: suspend () -> Unit): Job { |
| return launch(DefaultDispatcher) { |
| try { |
| block() |
| } catch (t: Throwable) { |
| failures.addError(t) |
| } |
| } |
| } |
| |
| @Test |
| fun testStressReadWriteFully() = runTest { |
| val size = 100 |
| val data = ByteArray(size) { it.toByte() } |
| val exec = newFixedThreadPoolContext(8, "testStressReadFully") |
| val buffers = object : DefaultPool<ByteArray>(10) { |
| override fun produceInstance(): ByteArray { |
| return ByteArray(size) |
| } |
| } |
| |
| try { |
| (1..100_000 * stressTestMultiplier).map { |
| async(exec) { |
| val channel = ByteBufferChannel(autoFlush = false, pool = pool) |
| val job = launch(exec) { |
| try { |
| channel.writeFully(data) |
| } finally { |
| channel.close() |
| } |
| } |
| |
| yield() |
| val buffer = buffers.borrow() |
| channel.readFully(buffer) |
| buffers.recycle(buffer) |
| job.cancel() |
| } |
| }.forEach { |
| it.await() |
| } |
| } finally { |
| exec.close() |
| } |
| } |
| |
| @Test |
| fun testJoinToSmokeTest() = runTest { |
| val other = ByteBufferChannel(autoFlush = false, pool = pool) |
| launch(coroutineContext) { |
| ch.joinTo(other, false) |
| } |
| yield() |
| |
| ch.writeInt(0x11223344) |
| ch.flush() |
| assertEquals(0x11223344, other.readInt()) |
| |
| ch.close() |
| } |
| |
| @Test |
| fun testJoinToChainSmokeTest1() = runTest { |
| val A = ByteBufferChannel(autoFlush = false, pool = pool) |
| val B = ByteBufferChannel(autoFlush = false, pool = pool) |
| val C = ByteBufferChannel(autoFlush = false, pool = pool) |
| |
| launch(coroutineContext) { |
| B.joinTo(C, closeOnEnd = true) |
| } |
| launch(coroutineContext) { |
| A.joinTo(B, closeOnEnd = true) |
| } |
| |
| yield() |
| A.writeStringUtf8("OK") |
| A.close() |
| |
| assertEquals("OK", C.readUTF8Line()) |
| } |
| |
| @Test |
| fun testJoinToChainSmokeTest2() = runTest { |
| val A = ByteBufferChannel(autoFlush = false, pool = pool) |
| val B = ByteBufferChannel(autoFlush = false, pool = pool) |
| val C = ByteBufferChannel(autoFlush = false, pool = pool) |
| |
| launch(coroutineContext) { |
| A.joinTo(B, closeOnEnd = true) |
| } |
| launch(coroutineContext) { |
| B.joinTo(C, closeOnEnd = true) |
| } |
| |
| yield() |
| A.writeStringUtf8("OK") |
| A.close() |
| |
| assertEquals("OK", C.readUTF8Line()) |
| } |
| |
| @Test |
| fun testJoinToChainSmokeTest3() = runTest { |
| val A = ByteBufferChannel(autoFlush = false, pool = pool) |
| val B = ByteBufferChannel(autoFlush = false, pool = pool) |
| val C = ByteBufferChannel(autoFlush = false, pool = pool) |
| |
| launch(coroutineContext + CoroutineName("A->B")) { |
| A.joinTo(B, closeOnEnd = true) |
| } |
| launch(coroutineContext + CoroutineName("B->C")) { |
| B.joinTo(C, closeOnEnd = true) |
| } |
| |
| A.writeStringUtf8("OK\n") |
| // A.close() |
| A.flush() |
| yield() |
| yield() |
| yield() |
| A.close() |
| |
| assertEquals("OK", C.readUTF8Line()) |
| } |
| |
| @Test |
| fun testJoinToChainSmokeTest4() = runTest { |
| val A = ByteBufferChannel(autoFlush = false, pool = pool) |
| val B = ByteBufferChannel(autoFlush = false, pool = pool) |
| val C = ByteBufferChannel(autoFlush = false, pool = pool) |
| |
| launch(coroutineContext + CoroutineName("A->B")) { |
| A.joinTo(B, closeOnEnd = true) |
| } |
| launch(coroutineContext + CoroutineName("B->C")) { |
| B.joinTo(C, closeOnEnd = true) |
| } |
| |
| A.writeStringUtf8("OK\n") |
| A.close() |
| |
| assertEquals("OK", C.readUTF8Line()) |
| } |
| |
| @Test |
| fun testJoinToFull() = runTest() { |
| val D = ByteBufferChannel(autoFlush = false, pool = pool) |
| |
| var written = 0 |
| D.writeByte(1) |
| written++ |
| while (D.availableForWrite > 0) { |
| D.writeByte(1) |
| written++ |
| } |
| |
| ch.writeInt(777) |
| ch.close() |
| |
| launch(coroutineContext) { |
| ch.joinTo(D, true) |
| } |
| |
| yield() |
| |
| repeat(written) { |
| D.readByte() |
| } |
| |
| assertEquals(777, D.readInt()) |
| } |
| |
| @Test |
| fun testJoinToChainNonEmpty() = runTest { |
| val A = ByteBufferChannel(autoFlush = false, pool = pool) |
| val B = ByteBufferChannel(autoFlush = false, pool = pool) |
| val C = ByteBufferChannel(autoFlush = false, pool = pool) |
| |
| A.writeStringUtf8("1") |
| A.flush() |
| |
| launch(coroutineContext + CoroutineName("Reader")) { |
| assertEquals("1OK", C.readUTF8Line()) |
| } |
| yield() |
| |
| launch(coroutineContext + CoroutineName("A->B")) { |
| A.joinTo(B, closeOnEnd = true) |
| } |
| |
| yield() |
| |
| launch(coroutineContext + CoroutineName("B->C")) { |
| B.joinTo(C, closeOnEnd = true) |
| } |
| yield() |
| |
| |
| yield() |
| yield() |
| yield() |
| |
| A.writeStringUtf8("OK\n") |
| A.close() |
| } |
| |
| @Test |
| fun testCopyToThenJoinTo() = runTest { |
| val A = ByteBufferChannel(autoFlush = false, pool = pool) |
| val B = ByteBufferChannel(autoFlush = false, pool = pool) |
| val C = ByteBufferChannel(autoFlush = false, pool = pool) |
| |
| // A.writeStringUtf8("1") |
| // A.flush() |
| |
| launch(coroutineContext + CoroutineName("A->B")) { |
| A.copyAndClose(B) |
| } |
| |
| launch(coroutineContext + CoroutineName("Reader")) { |
| assertEquals("1OK", C.readUTF8Line()) |
| } |
| |
| launch(coroutineContext + CoroutineName("B->C")) { |
| B.joinTo(C, closeOnEnd = true) |
| } |
| |
| A.writeStringUtf8("1OK\n") |
| A.close() |
| } |
| |
| @Test |
| fun testCopyToThenJoinTo2() = runTest { |
| val A = ByteBufferChannel(autoFlush = false, pool = pool) |
| val B = ByteBufferChannel(autoFlush = false, pool = pool) |
| val C = ByteBufferChannel(autoFlush = false, pool = pool) |
| |
| A.writeStringUtf8("1") |
| A.flush() |
| |
| launch(coroutineContext + CoroutineName("A->B")) { |
| A.copyAndClose(B) |
| } |
| |
| launch(coroutineContext + CoroutineName("Reader")) { |
| assertEquals("1OK", C.readUTF8Line()) |
| } |
| |
| launch(coroutineContext + CoroutineName("B->C")) { |
| B.joinTo(C, closeOnEnd = true) |
| } |
| |
| A.writeStringUtf8("OK\n") |
| A.close() |
| } |
| |
| @Test |
| fun testCopyToThenJoinTo3() = runTest { |
| val A = ByteBufferChannel(autoFlush = false, pool = pool) |
| val B = ByteBufferChannel(autoFlush = false, pool = pool) |
| val C = ByteBufferChannel(autoFlush = false, pool = pool) |
| |
| A.writeStringUtf8("1") |
| A.flush() |
| |
| launch(coroutineContext + CoroutineName("A->B")) { |
| A.copyAndClose(B) |
| } |
| |
| yield() |
| |
| launch(coroutineContext + CoroutineName("Reader")) { |
| assertEquals("1OK", C.readUTF8Line()) |
| } |
| |
| launch(coroutineContext + CoroutineName("B->C")) { |
| B.joinTo(C, closeOnEnd = true) |
| } |
| |
| A.writeStringUtf8("OK\n") |
| A.close() |
| } |
| |
| @Test |
| fun testCopyToThenJoinTo4() = runTest { |
| val A = ByteBufferChannel(autoFlush = false, pool = pool) |
| val B = ByteBufferChannel(autoFlush = false, pool = pool) |
| val C = ByteBufferChannel(autoFlush = false, pool = pool) |
| |
| A.writeStringUtf8("1") |
| A.flush() |
| |
| launch(coroutineContext + CoroutineName("Reader")) { |
| assertEquals("1OK", C.readUTF8Line()) |
| } |
| |
| yield() |
| |
| launch(coroutineContext + CoroutineName("A->B")) { |
| A.copyAndClose(B) |
| } |
| |
| launch(coroutineContext + CoroutineName("B->C")) { |
| B.joinTo(C, closeOnEnd = true) |
| } |
| |
| A.writeStringUtf8("OK\n") |
| A.close() |
| } |
| |
| @Test |
| fun testCopyToThenJoinTo5() = runTest { |
| val A = ByteBufferChannel(autoFlush = false, pool = pool) |
| val B = ByteBufferChannel(autoFlush = false, pool = pool) |
| val C = ByteBufferChannel(autoFlush = false, pool = pool) |
| |
| A.writeStringUtf8("1") |
| A.flush() |
| |
| launch(coroutineContext + CoroutineName("B->C")) { |
| B.joinTo(C, closeOnEnd = true) |
| } |
| |
| yield() |
| |
| launch(coroutineContext + CoroutineName("Reader")) { |
| assertEquals("1OK", C.readUTF8Line()) |
| } |
| yield() |
| |
| launch(coroutineContext + CoroutineName("A->B")) { |
| A.copyAndClose(B) |
| } |
| |
| A.writeStringUtf8("OK\n") |
| A.close() |
| } |
| |
| @Test |
| fun testCopyToThenJoinTo6() = runTest { |
| val A = ByteBufferChannel(autoFlush = false, pool = pool) |
| val B = ByteBufferChannel(autoFlush = false, pool = pool) |
| val C = ByteBufferChannel(autoFlush = false, pool = pool) |
| |
| A.writeStringUtf8("1") |
| A.flush() |
| |
| launch(coroutineContext + CoroutineName("B->C")) { |
| B.joinTo(C, closeOnEnd = true) |
| } |
| |
| yield() |
| launch(coroutineContext + CoroutineName("A->B")) { |
| A.copyAndClose(B) |
| } |
| |
| yield() |
| |
| launch(coroutineContext + CoroutineName("Reader")) { |
| assertEquals("1OK", C.readUTF8Line()) |
| } |
| |
| A.writeStringUtf8("OK\n") |
| A.close() |
| } |
| |
| @Test |
| fun testJoinClosed() = runTest { |
| ch.writeInt(777) |
| ch.close() |
| |
| val bc = ByteBufferChannel(autoFlush = false, pool = pool) |
| ch.joinTo(bc, closeOnEnd = true) |
| |
| assertEquals(777, bc.readInt()) |
| assertEquals(0, bc.readRemaining().remaining) |
| } |
| |
| @Test |
| fun testJoinToResumeRead() = runTest { |
| val other = ByteBufferChannel(autoFlush = true, pool = pool) |
| val result = async(coroutineContext) { |
| other.readLong() |
| } |
| yield() |
| |
| launch(coroutineContext) { |
| ch.joinTo(other, true) |
| } |
| yield() |
| yield() |
| |
| ch.writeLong(0x1122334455667788L) |
| yield() |
| assertEquals(0x1122334455667788L, result.await()) |
| |
| ch.close() |
| } |
| |
| @Test |
| fun testJoinToAfterWrite() = runTest { |
| val other = ByteBufferChannel(autoFlush = false, pool = pool) |
| |
| ch.writeInt(0x12345678) |
| launch(coroutineContext) { |
| ch.joinTo(other, false) |
| } |
| |
| ch.writeInt(0x11223344) |
| ch.flush() |
| |
| yield() |
| |
| assertEquals(0x12345678, other.readInt()) |
| assertEquals(0x11223344, other.readInt()) |
| ch.close() |
| } |
| |
| @Test |
| fun testJoinToClosed() = runTest { |
| val other = ByteBufferChannel(autoFlush = false, pool = pool) |
| |
| ch.writeInt(0x11223344) |
| ch.close() |
| |
| ch.joinTo(other, true) |
| yield() |
| |
| assertEquals(0x11223344, other.readInt()) |
| assertTrue { other.isClosedForRead } |
| } |
| |
| @Test |
| fun testJoinToDifferentEndian() = runTest { |
| val other = ByteBufferChannel(autoFlush = true, pool = pool) |
| other.writeByteOrder = ByteOrder.LITTLE_ENDIAN |
| ch.writeByteOrder = ByteOrder.BIG_ENDIAN |
| |
| ch.writeInt(0x11223344) // BE |
| |
| launch(coroutineContext) { |
| ch.joinTo(other, true) |
| } |
| |
| yield() |
| |
| ch.writeInt(0x55667788) // BE |
| ch.writeByteOrder = ByteOrder.LITTLE_ENDIAN |
| ch.writeInt(0x0abbccdd) // LE |
| ch.close() |
| |
| other.readByteOrder = ByteOrder.BIG_ENDIAN |
| assertEquals(0x11223344, other.readInt()) // BE |
| assertEquals(0x55667788, other.readInt()) // BE |
| other.readByteOrder = ByteOrder.LITTLE_ENDIAN |
| assertEquals(0x0abbccdd, other.readInt()) // LE |
| } |
| |
| @Test |
| fun testReadThenRead() = runTest { |
| val phase = AtomicInteger(0) |
| |
| val first = launch(coroutineContext) { |
| try { |
| ch.readInt() |
| fail("EOF expected") |
| } catch (expected: ClosedReceiveChannelException) { |
| assertEquals(1, phase.get()) |
| } |
| } |
| |
| yield() |
| |
| val second = launch(coroutineContext) { |
| try { |
| ch.readInt() |
| fail("Should fail with ISE") |
| } catch (expected: IllegalStateException) { |
| } |
| } |
| |
| yield() |
| phase.set(1) |
| ch.close() |
| |
| yield() |
| |
| first.invokeOnCompletion { t -> |
| t?.let { throw it } |
| } |
| second.invokeOnCompletion { t -> |
| t?.let { throw it } |
| } |
| } |
| |
| @Test |
| fun writeThenReadStress() = runTest { |
| ch.close() |
| |
| for (i in 1..50_000 * stressTestMultiplier) { |
| val a = ByteBufferChannel(false, pool) |
| |
| val w = launch { |
| a.writeLong(1) |
| a.close() |
| } |
| val r = launch { |
| a.readLong() |
| } |
| |
| w.join() |
| r.join() |
| } |
| } |
| |
| @Test |
| fun joinToEmptyStress() = runTest { |
| for (i in 1..50_000 * stressTestMultiplier) { |
| val a = ByteBufferChannel(false, pool) |
| |
| launch(coroutineContext) { |
| a.joinTo(ch, true) |
| } |
| |
| yield() |
| |
| a.close() |
| } |
| } |
| |
| @Test |
| fun testJoinToStress() = runTest { |
| for (i in 1..10000 * stressTestMultiplier) { |
| val child = ByteBufferChannel(false, pool) |
| val writer = launch { |
| child.writeLong(999 + i.toLong()) |
| child.close() |
| } |
| |
| child.joinTo(ch, false) |
| assertEquals(999 + i.toLong(), ch.readLong()) |
| writer.join() |
| } |
| |
| assertEquals(0, ch.availableForRead) |
| ch.close() |
| } |
| |
| @Test |
| fun testSequentialJoin() = runTest { |
| val steps = 20_000 * stressTestMultiplier |
| |
| val pipeline = launch(coroutineContext) { |
| for (i in 1..steps) { |
| val child = ByteBufferChannel(false, pool) |
| launch(coroutineContext) { |
| child.writeInt(i) |
| child.close() |
| } |
| child.joinTo(ch, false) |
| } |
| } |
| |
| for (i in 1..steps) { |
| assertEquals(i, ch.readInt()) |
| } |
| |
| pipeline.join() |
| pipeline.invokeOnCompletion { cause -> |
| cause?.let { throw it } |
| } |
| } |
| |
| @Test |
| fun testSequentialJoinYield() = runTest { |
| val steps = 20_000 * stressTestMultiplier |
| |
| val pipeline = launch(coroutineContext) { |
| for (i in 1..steps) { |
| val child = ByteBufferChannel(false, pool) |
| launch(coroutineContext) { |
| child.writeInt(i) |
| child.close() |
| } |
| yield() |
| child.joinTo(ch, false) |
| } |
| } |
| |
| for (i in 1..steps) { |
| assertEquals(i, ch.readInt()) |
| } |
| |
| pipeline.join() |
| pipeline.invokeOnCompletion { cause -> |
| cause?.let { throw it } |
| } |
| } |
| |
| @Test |
| fun testJoinToNoFlush() = runTest { |
| val src = ByteChannel(false) |
| launch(coroutineContext) { |
| src.joinTo(ch, closeOnEnd = false, flushOnEnd = false) |
| assertEquals(0, ch.availableForRead) |
| ch.flush() |
| assertEquals(4, ch.availableForRead) |
| } |
| yield() |
| |
| src.writeInt(777) |
| src.close() |
| } |
| |
| @Test |
| fun testReadBlock() = runTest { |
| var bytesRead = 0L |
| |
| val r: (ByteBuffer) -> Unit = { bb -> |
| bytesRead += bb.remaining() |
| bb.position(bb.limit()) |
| } |
| |
| val j = launch(coroutineContext) { |
| while (!ch.isClosedForRead) { |
| ch.read(0, r) |
| } |
| } |
| |
| yield() |
| |
| ch.writeStringUtf8("OK\n") |
| ch.close() |
| |
| j.join() |
| j.invokeOnCompletion { |
| it?.let { throw it } |
| } |
| } |
| |
| @Test |
| fun testReadBlock2() = runTest { |
| var bytesRead = 0L |
| |
| val r: (ByteBuffer) -> Unit = { bb -> |
| bytesRead += bb.remaining() |
| bb.position(bb.limit()) |
| } |
| |
| val j = launch(coroutineContext) { |
| while (!ch.isClosedForRead) { |
| ch.read(0, r) |
| } |
| } |
| |
| ch.writeStringUtf8("OK\n") |
| yield() |
| ch.close() |
| |
| j.join() |
| j.invokeOnCompletion { |
| it?.let { throw it } |
| } |
| } |
| |
| @Test |
| fun testCancelWriter() = runTest { |
| val sub = writer(DefaultDispatcher) { |
| delay(1000000L) |
| } |
| |
| sub.channel.cancel() |
| sub.join() |
| } |
| |
| @Test |
| fun testCancelReader() = runTest { |
| val sub = reader(DefaultDispatcher) { |
| delay(10000000L) |
| } |
| |
| sub.channel.close(CancellationException()) |
| sub.join() |
| } |
| |
| @Test |
| fun testWriteSuspendSessionSmokeTest() = runTest { |
| ch.writeSuspendSession { |
| val buffer = request(1) |
| assertNotNull(buffer) |
| } |
| |
| ch.writeSuspendSession { |
| val buffer = request(1)!! |
| buffer.put(0x11) |
| written(1) |
| } |
| |
| assertEquals(0, ch.availableForRead) |
| ch.flush() |
| assertEquals(1, ch.availableForRead) |
| assertEquals(0x11, ch.readByte()) |
| } |
| |
| @Test |
| fun testWriteSuspendSessionJoined() = runTest { |
| val next = ByteChannel() |
| launch(Unconfined) { |
| ch.joinTo(next, true) |
| } |
| |
| yield() |
| |
| ch.writeSuspendSession { |
| val buffer = request(1) |
| assertNotNull(buffer) |
| buffer!!.put(0x11) |
| written(1) |
| } |
| |
| assertEquals(0, next.availableForRead) |
| ch.flush() |
| assertEquals(1, next.availableForRead) |
| assertEquals(0x11, next.readByte()) |
| } |
| |
| @Test |
| fun testWriteSuspendSessionJoinDuringWrite() = runTest { |
| val next = ByteChannel() |
| |
| ch.writeSuspendSession { |
| var buffer = request(1) |
| assertNotNull(buffer) |
| buffer!!.put(0x11) |
| written(1) |
| |
| launch(Unconfined) { |
| ch.joinTo(next, true) |
| } |
| |
| yield() |
| |
| assertNull(request(1)) |
| tryAwait(1) |
| buffer = request(1) |
| assertNotNull(buffer) |
| buffer!!.put(0x22) |
| written(1) |
| } |
| |
| ch.flush() |
| |
| assertEquals(2, next.availableForRead) |
| assertEquals(0x11, next.readByte()) |
| assertEquals(0x22, next.readByte()) |
| } |
| |
| @Test |
| fun testJoiningDuringWriteFully() = runTest { |
| val bb = ByteArray(65536) |
| Random().nextBytes(bb) |
| val dest = ByteChannel() |
| |
| launch(coroutineContext) { |
| expect(1) |
| ch.writeFully(bb) |
| ch.flush() |
| } |
| yield() |
| expect(2) |
| |
| launch { |
| expect(3) |
| ch.joinTo(dest, false) |
| } |
| yield() |
| |
| val result = ByteArray(bb.size) |
| dest.readFully(result) |
| |
| Assert.assertArrayEquals(bb, result) |
| finish(4) |
| } |
| |
| private inline fun buildPacket(block: ByteWritePacket.() -> Unit): ByteReadPacket { |
| val builder = BytePacketBuilder(0, pktPool) |
| try { |
| block(builder) |
| return builder.build() |
| } catch (t: Throwable) { |
| builder.release() |
| throw t |
| } |
| } |
| } |