IO: optimize byte channel writePacket suspend tail-calls
diff --git a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt
index ef8173c..1486f22 100644
--- a/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt
+++ b/integration/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt
@@ -813,42 +813,124 @@
when (packet) {
is ByteReadPacketEmpty -> return
- is ByteReadPacketSingle -> {
- val buffer = packet.steal()
- try {
- writeFully(buffer)
- } finally {
- packet.pool.recycle(buffer)
+ is ByteReadPacketSingle -> writeSingleBufferPacket(packet)
+ is ByteReadPacketImpl -> writeMultipleBufferPacket(packet)
+ else -> writeExternalPacket(packet)
+ }
+ }
+
+ private suspend fun writeSingleBufferPacket(packet: ByteReadPacketSingle) {
+ val buffer = packet.steal()
+ val t = try {
+ writeAsMuchAsPossible(buffer)
+ null
+ } catch (t: Throwable) {
+ t
+ }
+
+ if (t != null) {
+ packet.pool.recycle(buffer)
+ throw t
+ }
+
+ if (buffer.hasRemaining()) {
+ return writeSingleBufferPacketSuspend(buffer, packet)
+ }
+
+ packet.pool.recycle(buffer)
+ }
+
+ private suspend fun writeMultipleBufferPacket(packet: ByteReadPacketImpl) {
+ var buffer: ByteBuffer? = null
+
+ val t = try {
+ while (packet.remaining > 0) {
+ buffer = packet.steal()
+ writeAsMuchAsPossible(buffer)
+ if (buffer.hasRemaining()) break
+ packet.pool.recycle(buffer)
+ }
+ null
+ } catch (t: Throwable) { t }
+
+ if (t != null) {
+ buffer?.let { packet.pool.recycle(it) }
+ packet.release()
+ throw t
+ }
+
+ if (buffer != null) {
+ return writeMultipleBufferPacketSuspend(buffer, packet)
+ }
+
+ packet.release()
+ }
+
+ private suspend fun writeSingleBufferPacketSuspend(buffer: ByteBuffer, packet: ByteReadPacketSingle) {
+ try {
+ writeFully(buffer)
+ } finally {
+ packet.pool.recycle(buffer)
+ }
+ }
+
+ private suspend fun writeMultipleBufferPacketSuspend(rem: ByteBuffer, packet: ByteReadPacketImpl) {
+ var buffer = rem
+
+ try {
+ do {
+ writeFully(buffer)
+ if (packet.remaining == 0) break
+ packet.pool.recycle(buffer)
+ buffer = packet.steal()
+ } while (true)
+ } finally {
+ packet.pool.recycle(buffer)
+ packet.release()
+ }
+ }
+
+ private suspend fun writeExternalPacket(packet: ByteReadPacket) {
+ val buffer = BufferPool.borrow()
+ val t = try {
+ while (packet.remaining > 0) {
+ buffer.clear()
+ packet.readLazy(buffer)
+ buffer.flip()
+ writeAsMuchAsPossible(buffer)
+ if (buffer.hasRemaining()) {
+ buffer.compact()
+ break
}
}
- is ByteReadPacketImpl -> {
- try {
- while (packet.remaining > 0) {
- val buffer = packet.steal()
- try {
- writeFully(buffer)
- } finally {
- packet.pool.recycle(buffer)
- }
- }
- } finally {
- packet.release()
- }
- }
- else -> {
- val buffer = BufferPool.borrow()
- try {
- while (packet.remaining > 0) {
- buffer.clear()
- packet.readLazy(buffer)
- buffer.flip()
- writeFully(buffer)
- }
- } finally {
- BufferPool.recycle(buffer)
- packet.release()
- }
- }
+
+ null
+ } catch (t: Throwable) {
+ t
+ }
+
+ buffer.flip()
+ if (buffer.hasRemaining()) {
+ return writeExternalPacketSuspend(buffer, packet)
+ }
+
+ BufferPool.recycle(buffer)
+ packet.release()
+
+ if (t != null) throw t
+ }
+
+ private suspend fun writeExternalPacketSuspend(buffer: ByteBuffer, packet: ByteReadPacket) {
+ try {
+ do {
+ buffer.compact()
+ packet.readLazy(buffer)
+ buffer.flip()
+ writeFully(buffer)
+ } while (packet.remaining > 0)
+ } finally {
+ BufferPool.recycle(buffer)
+ packet.release()
}
}