IO: fix byte packet write buffer leak
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt
index ace28e2..ef8173c 100644
--- a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt
@@ -2,15 +2,12 @@
package kotlinx.coroutines.experimental.io
-import kotlinx.coroutines.experimental.CancellableContinuation
-import kotlinx.coroutines.experimental.channels.ClosedReceiveChannelException
-import kotlinx.coroutines.experimental.channels.ClosedSendChannelException
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.*
import kotlinx.coroutines.experimental.io.internal.*
import kotlinx.coroutines.experimental.io.packet.*
-import kotlinx.coroutines.experimental.suspendCancellableCoroutine
-import java.nio.BufferOverflowException
-import java.nio.charset.MalformedInputException
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
+import java.nio.*
+import java.util.concurrent.atomic.*
// implementation for ByteChannel
internal class ByteBufferChannel(
@@ -812,7 +809,7 @@
}
suspend override fun writePacket(packet: ByteReadPacket) {
- closed?.sendException?.let { throw it }
+ closed?.sendException?.let { packet.release(); throw it }
when (packet) {
is ByteReadPacketEmpty -> return
@@ -821,17 +818,21 @@
try {
writeFully(buffer)
} finally {
- BufferPool.recycle(buffer)
+ packet.pool.recycle(buffer)
}
}
is ByteReadPacketImpl -> {
- while (packet.remaining > 0) {
- val buffer = packet.steal()
- try {
- writeFully(buffer)
- } finally {
- BufferPool.recycle(buffer)
+ try {
+ while (packet.remaining > 0) {
+ val buffer = packet.steal()
+ try {
+ writeFully(buffer)
+ } finally {
+ packet.pool.recycle(buffer)
+ }
}
+ } finally {
+ packet.release()
}
}
else -> {
@@ -839,12 +840,13 @@
try {
while (packet.remaining > 0) {
buffer.clear()
- packet.readFully(buffer)
+ packet.readLazy(buffer)
buffer.flip()
writeFully(buffer)
}
} finally {
BufferPool.recycle(buffer)
+ packet.release()
}
}
}
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketImpl.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketImpl.kt
index 8e3d956..dd9fad2 100644
--- a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketImpl.kt
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketImpl.kt
@@ -8,7 +8,7 @@
import java.nio.charset.*
import java.util.*
-internal class ByteReadPacketImpl(private val packets: ArrayDeque<ByteBuffer>, private val pool: ObjectPool<ByteBuffer>) : ByteReadPacket {
+internal class ByteReadPacketImpl(private val packets: ArrayDeque<ByteBuffer>, internal val pool: ObjectPool<ByteBuffer>) : ByteReadPacket {
override val remaining: Int
get() = packets.sumBy { it.remaining() }
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketSingle.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketSingle.kt
index 12adaf5..5b4c28b 100644
--- a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketSingle.kt
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/packet/ByteReadPacketSingle.kt
@@ -5,11 +5,11 @@
import java.nio.*
import java.nio.charset.*
-internal class ByteReadPacketSingle(private var buffer: ByteBuffer?, private val pool: ObjectPool<ByteBuffer>) : ByteReadPacket {
+internal class ByteReadPacketSingle(private var buffer: ByteBuffer?, internal val pool: ObjectPool<ByteBuffer>) : ByteReadPacket {
override val remaining: Int
get() = buffer?.remaining() ?: 0
- internal fun steal(): ByteBuffer = buffer ?: throw IllegalStateException("EOF")
+ internal fun steal(): ByteBuffer = buffer?.also { buffer = null } ?: throw IllegalStateException("EOF")
override fun readLazy(dst: ByteArray, offset: Int, length: Int): Int {
var copied = 0