IO: fix byte channel close with error buffer leak
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt
index dae080d..ace28e2 100644
--- a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt
@@ -63,7 +63,7 @@
val newClosed = if (cause == null) ClosedElement.EmptyCause else ClosedElement(cause)
if (!Closed.compareAndSet(this, null, newClosed)) return false
flush()
- if (state.capacity.isEmpty()) tryTerminate()
+ if (state.capacity.isEmpty() || cause != null) tryTerminate()
resumeClosed(cause)
return true
}
@@ -179,12 +179,25 @@
private fun tryTerminate() {
val closed = closed ?: return
+ var toRelease: ReadWriteBufferState.Initial? = null
+
updateState { state ->
when {
state === ReadWriteBufferState.IdleEmpty -> ReadWriteBufferState.Terminated
+ closed.cause != null && state is ReadWriteBufferState.IdleNonEmpty -> {
+ toRelease = state.initial
+ ReadWriteBufferState.Terminated
+ }
else -> return
}
}
+
+ toRelease?.let { buffer ->
+ if (state === ReadWriteBufferState.Terminated) {
+ releaseBuffer(buffer)
+ }
+ }
+
WriteOp.getAndSet(this, null)?.resumeWithException(closed.sendException)
ReadOp.getAndSet(this, null)?.apply {
if (closed.cause != null) resumeWithException(closed.cause) else resume(false)
diff --git a/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/PooledBufferTest.kt b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/PooledBufferTest.kt
index 6b90440..d8d15a0 100644
--- a/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/PooledBufferTest.kt
+++ b/integration/kotlinx-coroutines-io/src/test/kotlin/kotlinx/coroutines/experimental/io/PooledBufferTest.kt
@@ -5,6 +5,7 @@
import kotlinx.coroutines.experimental.runBlocking
import org.junit.After
import org.junit.Test
+import java.io.*
import java.nio.ByteBuffer
import java.util.concurrent.CopyOnWriteArrayList
import kotlin.test.assertEquals
@@ -82,4 +83,14 @@
assertEquals(0, allocated.size)
}
}
+
+ @Test
+ fun testCloseWithEerror() {
+ runBlocking {
+ channel.writeFully("OK".toByteArray())
+ assertEquals(1, allocated.size)
+ channel.close(IOException())
+ assertEquals(0, allocated.size)
+ }
+ }
}
\ No newline at end of file