blob: 31aa1323e1daf7f612f1aa65e0bfc96c1d4afd5b [file] [log] [blame]
Sergey Mashkove86eb082017-12-04 18:45:05 +03001package kotlinx.coroutines.experimental.io.jvm.nio
2
3import kotlinx.coroutines.experimental.io.*
4import java.nio.channels.*
5
6/**
7 * Copies up to [limit] bytes from blocking NIO channel to CIO [ch]. It does suspend if no space available for writing
8 * in the destination channel but may block if source NIO channel blocks.
9 *
10 * @return number of bytes were copied
11 */
12suspend fun ReadableByteChannel.copyTo(ch: ByteWriteChannel, limit: Long = Long.MAX_VALUE): Long {
13 require(limit >= 0L) { "Limit shouldn't be negative: $limit" }
14 if (this is SelectableChannel && !isBlocking) {
15 throw IllegalArgumentException("Non-blocking channels are not supported")
16 }
17
18 var copied = 0L
19 var eof = false
20
21 val copy: (ByteBuffer) -> Unit = { bb ->
22 val rem = limit - copied
23 if (rem < bb.remaining()) {
24 val l = bb.limit()
25 bb.limit(bb.position() + rem.toInt())
26 val rc = read(bb)
27 if (rc == -1) eof = true
28 else copied += rc
29 bb.limit(l)
30 } else {
31 val rc = read(bb)
32 if (rc == -1) eof = true
33 else copied += rc
34 }
35 }
36
Sergey Mashkovc558b2e2017-12-05 15:25:30 +030037 val needFlush = !ch.autoFlush
Sergey Mashkove86eb082017-12-04 18:45:05 +030038 while (copied < limit && !eof) {
39 ch.write(1, copy)
Sergey Mashkovc558b2e2017-12-05 15:25:30 +030040 if (needFlush) ch.flush()
Sergey Mashkove86eb082017-12-04 18:45:05 +030041 }
42
43 return copied
44}
45
46/**
47 * Copies up to [limit] bytes from a blocking NIO pipe to CIO [ch]. A shortcut to copyTo with
48 * NIO readable channel receiver
49 *
50 * @return number of bytes copied
51 */
52suspend fun Pipe.copyTo(ch: ByteWriteChannel, limit: Long = Long.MAX_VALUE): Long = source().copyTo(ch, limit)