blob: 9bd810b4d0b61fc74c68257ab01bf2f2c43576c5 [file] [log] [blame]
Sergey Mashkov86f70612017-07-25 10:59:44 +03001package kotlinx.coroutines.experimental.io
2
Sergey Mashkov72e91502017-09-03 00:17:14 +03003import kotlinx.coroutines.experimental.io.internal.*
Sergey Mashkova22ed422017-09-05 19:24:30 +03004import kotlinx.coroutines.experimental.io.packet.*
Sergey Mashkov6d059932017-10-06 18:46:23 +03005import kotlinx.coroutines.experimental.io.packet.ByteReadPacket
6import kotlinx.io.core.*
Sergey Mashkov86f70612017-07-25 10:59:44 +03007
Roman Elizarov469cad32017-08-15 15:54:56 +03008/**
9 * Channel for asynchronous reading of sequences of bytes.
10 * This is a **single-reader channel**.
11 *
12 * Operations on this channel cannot be invoked concurrently.
13 */
14public interface ByteReadChannel {
Sergey Mashkov86f70612017-07-25 10:59:44 +030015 /**
Roman Elizarov469cad32017-08-15 15:54:56 +030016 * Returns number of bytes that can be read without suspension. Read operations do no suspend and return
17 * immediately when this number is at least the number of bytes requested for read.
Sergey Mashkov86f70612017-07-25 10:59:44 +030018 */
Roman Elizarov469cad32017-08-15 15:54:56 +030019 public val availableForRead: Int
Sergey Mashkov86f70612017-07-25 10:59:44 +030020
21 /**
Roman Elizarov469cad32017-08-15 15:54:56 +030022 * Returns `true` if the channel is closed and no remaining bytes are available for read.
23 * It implies that [availableForRead] is zero.
Sergey Mashkov86f70612017-07-25 10:59:44 +030024 */
Roman Elizarov469cad32017-08-15 15:54:56 +030025 public val isClosedForRead: Boolean
Sergey Mashkov86f70612017-07-25 10:59:44 +030026
Sergey Mashkov4cd00142017-09-05 19:25:32 +030027 public val isClosedForWrite: Boolean
28
Sergey Mashkov86f70612017-07-25 10:59:44 +030029 /**
Roman Elizarov469cad32017-08-15 15:54:56 +030030 * Byte order that is used for multi-byte read operations
31 * (such as [readShort], [readInt], [readLong], [readFloat], and [readDouble]).
32 */
33 public var readByteOrder: ByteOrder
34
35 /**
Sergey Mashkovaae062d2017-09-07 15:12:50 +030036 * Number of bytes read from the channel.
37 * It is not guaranteed to be atomic so could be updated in the middle of long running read operation.
38 */
39 public val totalBytesRead: Long
40
41 /**
Roman Elizarov469cad32017-08-15 15:54:56 +030042 * Reads all available bytes to [dst] buffer and returns immediately or suspends if no bytes available
Sergey Mashkov86f70612017-07-25 10:59:44 +030043 * @return number of bytes were read or `-1` if the channel has been closed
44 */
Roman Elizarov469cad32017-08-15 15:54:56 +030045 suspend fun readAvailable(dst: ByteArray, offset: Int, length: Int): Int
46 suspend fun readAvailable(dst: ByteArray) = readAvailable(dst, 0, dst.size)
47 suspend fun readAvailable(dst: ByteBuffer): Int
Sergey Mashkovc4694cc2017-10-09 12:51:34 +030048 suspend fun readAvailable(dst: BufferView): Int
Sergey Mashkov86f70612017-07-25 10:59:44 +030049
50 /**
51 * Reads all [length] bytes to [dst] buffer or fails if channel has been closed.
52 * Suspends if not enough bytes available.
53 */
54 suspend fun readFully(dst: ByteArray, offset: Int, length: Int)
Sergey Mashkov86f70612017-07-25 10:59:44 +030055 suspend fun readFully(dst: ByteArray) = readFully(dst, 0, dst.size)
Roman Elizarov469cad32017-08-15 15:54:56 +030056 suspend fun readFully(dst: ByteBuffer): Int
Sergey Mashkov86f70612017-07-25 10:59:44 +030057
58 /**
Sergey Mashkova10b6922017-08-03 13:41:54 +030059 * Reads the specified amount of bytes and makes a byte packet from them. Fails if channel has been closed
Sergey Mashkov57906eb2017-09-19 14:08:06 +030060 * and not enough bytes available. Accepts [headerSizeHint] to be provided, see [WritePacket].
Sergey Mashkova10b6922017-08-03 13:41:54 +030061 */
Sergey Mashkov57906eb2017-09-19 14:08:06 +030062 suspend fun readPacket(size: Int, headerSizeHint: Int = 0): ByteReadPacket
Sergey Mashkova10b6922017-08-03 13:41:54 +030063
64 /**
Sergey Mashkov86f70612017-07-25 10:59:44 +030065 * Reads a long number (suspending if not enough bytes available) or fails if channel has been closed
66 * and not enough bytes.
67 */
68 suspend fun readLong(): Long
69
70 /**
71 * Reads an int number (suspending if not enough bytes available) or fails if channel has been closed
72 * and not enough bytes.
73 */
74 suspend fun readInt(): Int
75
76 /**
77 * Reads a short number (suspending if not enough bytes available) or fails if channel has been closed
78 * and not enough bytes.
79 */
80 suspend fun readShort(): Short
81
82 /**
83 * Reads a byte (suspending if no bytes available yet) or fails if channel has been closed
84 * and not enough bytes.
85 */
86 suspend fun readByte(): Byte
87
88 /**
89 * Reads a boolean value (suspending if no bytes available yet) or fails if channel has been closed
90 * and not enough bytes.
91 */
92 suspend fun readBoolean(): Boolean
93
94 /**
Sergey Mashkov86f70612017-07-25 10:59:44 +030095 * Reads double number (suspending if not enough bytes available) or fails if channel has been closed
96 * and not enough bytes.
97 */
98 suspend fun readDouble(): Double
99
100 /**
101 * Reads float number (suspending if not enough bytes available) or fails if channel has been closed
102 * and not enough bytes.
103 */
104 suspend fun readFloat(): Float
105
Sergey Mashkov4cd00142017-09-05 19:25:32 +0300106 /**
107 * For every available bytes range invokes [visitor] function until it return false or end of stream encountered
108 */
109 suspend fun consumeEachBufferRange(visitor: (buffer: ByteBuffer, last: Boolean) -> Boolean)
110
111 fun <R> lookAhead(visitor: LookAheadSession.() -> R): R
112 suspend fun <R> lookAheadSuspend(visitor: suspend LookAheadSuspendSession.() -> R): R
Sergey Mashkov86f70612017-07-25 10:59:44 +0300113
114 /**
115 * Reads a line of UTF-8 characters to the specified [out] buffer up to [limit] characters.
116 * Supports both CR-LF and LF line endings.
117 * Throws an exception if the specified [limit] has been exceeded.
118 *
119 * @return `true` if line has been read (possibly empty) or `false` if channel has been closed
120 * and no characters were read.
121 */
122 suspend fun <A : Appendable> readUTF8LineTo(out: A, limit: Int = Int.MAX_VALUE): Boolean
Sergey Mashkov75101172017-09-25 19:41:08 +0300123
124 /**
125 * Invokes [block] when it will be possible to read at least [min] bytes
126 * providing byte buffer to it so lambda can read from the buffer
127 * up to [ByteBuffer.remaining] bytes. If there are no [min] bytes available then the invocation could
128 * suspend until the requirement will be met.
129 *
Sergey Mashkove86eb082017-12-04 18:45:05 +0300130 * If [min] is zero then the invocation will suspend until at least one byte available.
131 *
Sergey Mashkov75101172017-09-25 19:41:08 +0300132 * Warning: it is not guaranteed that all of remaining bytes will be represented as a single byte buffer
133 * eg: it could be 4 bytes available for read but the provided byte buffer could have only 2 remaining bytes:
134 * in this case you have to invoke read again (with decreased [min] accordingly).
135 *
Sergey Mashkove86eb082017-12-04 18:45:05 +0300136 * It will fail with [EOFException] if not enough bytes ([availableForRead] < [min]) available
137 * in the channel after it is closed.
138 *
139 * [block] lambda should modify buffer's position accordingly. It also could temporarily modify limit however
140 * it should restore it before return. It is not recommended to access any bytes of the buffer outside of the
141 * provided byte range [position(); limit()) as there could be any garbage or incomplete data.
142 *
143 * @param min amount of bytes available for read, should be positive or zero
Sergey Mashkov75101172017-09-25 19:41:08 +0300144 * @param block to be invoked when at least [min] bytes available for read
145 */
146 suspend fun read(min: Int = 1, block: (ByteBuffer) -> Unit)
Sergey Mashkoveee3aaf2017-12-05 15:51:55 +0300147
148 suspend fun cancel(cause: Throwable?): Boolean
Roman Elizarov469cad32017-08-15 15:54:56 +0300149}
150
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300151suspend fun ByteReadChannel.joinTo(dst: ByteWriteChannel, closeOnEnd: Boolean) {
152 require(dst !== this)
153
154 if (this is ByteBufferChannel && dst is ByteBufferChannel) {
155 return dst.joinFrom(this, closeOnEnd)
156 }
157
Sergey Mashkovf75ec152017-11-14 14:11:46 +0300158 return joinToImplSuspend(dst, closeOnEnd)
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300159}
160
Sergey Mashkovf75ec152017-11-14 14:11:46 +0300161private suspend fun ByteReadChannel.joinToImplSuspend(dst: ByteWriteChannel, close: Boolean) {
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300162 copyToImpl(dst, Long.MAX_VALUE)
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300163 if (close) {
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300164 dst.close()
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300165 } else {
166 dst.flush()
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300167 }
168}
169
Sergey Mashkov72e91502017-09-03 00:17:14 +0300170/**
171 * Reads up to [limit] bytes from receiver channel and writes them to [dst] channel.
172 * Closes [dst] channel if fails to read or write with cause exception.
173 * @return a number of copied bytes
174 */
175suspend fun ByteReadChannel.copyTo(dst: ByteWriteChannel, limit: Long = Long.MAX_VALUE): Long {
Sergey Mashkov88951ac2017-10-27 22:36:08 +0300176 require(this !== dst)
177 require(limit >= 0L)
178
179 if (this is ByteBufferChannel && dst is ByteBufferChannel) {
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300180 return dst.copyDirect(this, limit, null)
Sergey Mashkov88951ac2017-10-27 22:36:08 +0300181 }
182
183 return copyToImpl(dst, limit)
184}
185
186private suspend fun ByteReadChannel.copyToImpl(dst: ByteWriteChannel, limit: Long): Long {
Sergey Mashkov72e91502017-09-03 00:17:14 +0300187 val buffer = BufferPool.borrow()
Sergey Mashkov88951ac2017-10-27 22:36:08 +0300188 val dstNeedsFlush = !dst.autoFlush
189
Roman Elizarov469cad32017-08-15 15:54:56 +0300190 try {
191 var copied = 0L
Roman Elizarov469cad32017-08-15 15:54:56 +0300192
Sergey Mashkov88951ac2017-10-27 22:36:08 +0300193 while (true) {
Sergey Mashkov72e91502017-09-03 00:17:14 +0300194 buffer.clear()
Sergey Mashkov88951ac2017-10-27 22:36:08 +0300195
196 val bufferLimit = limit - copied
197 if (bufferLimit <= 0) break
198 if (bufferLimit < buffer.limit()) {
199 buffer.limit(bufferLimit.toInt())
Sergey Mashkov72e91502017-09-03 00:17:14 +0300200 }
201 val size = readAvailable(buffer)
Roman Elizarov469cad32017-08-15 15:54:56 +0300202 if (size == -1) break
203
Sergey Mashkov72e91502017-09-03 00:17:14 +0300204 buffer.flip()
205 dst.writeFully(buffer)
Roman Elizarov469cad32017-08-15 15:54:56 +0300206 copied += size
Sergey Mashkov88951ac2017-10-27 22:36:08 +0300207
208 if (dstNeedsFlush && availableForRead == 0) {
209 dst.flush()
210 }
Roman Elizarov469cad32017-08-15 15:54:56 +0300211 }
Roman Elizarov469cad32017-08-15 15:54:56 +0300212 return copied
213 } catch (t: Throwable) {
214 dst.close(t)
215 throw t
216 } finally {
Sergey Mashkov72e91502017-09-03 00:17:14 +0300217 BufferPool.recycle(buffer)
Roman Elizarov469cad32017-08-15 15:54:56 +0300218 }
219}
Sergey Mashkov7c8b1552017-08-31 15:13:51 +0300220
Sergey Mashkov72e91502017-09-03 00:17:14 +0300221/**
222 * Reads all the bytes from receiver channel and writes them to [dst] channel and then closes it.
223 * Closes [dst] channel if fails to read or write with cause exception.
224 * @return a number of copied bytes
225 */
Sergey Mashkov7c8b1552017-08-31 15:13:51 +0300226suspend fun ByteReadChannel.copyAndClose(dst: ByteWriteChannel): Long {
227 val count = copyTo(dst)
228 dst.close()
229 return count
230}
Sergey Mashkova22ed422017-09-05 19:24:30 +0300231
232/**
233 * Reads all the bytes from receiver channel and builds a packet that is returned unless the specified [limit] exceeded.
234 * It will simply stop reading and return packet of size [limit] in this case
235 */
Sergey Mashkov2c3a0dc2017-09-06 22:58:08 +0300236suspend fun ByteReadChannel.readRemaining(limit: Int = Int.MAX_VALUE): ByteReadPacket {
Sergey Mashkova22ed422017-09-05 19:24:30 +0300237 val buffer = BufferPool.borrow()
238 val packet = WritePacket()
239
240 try {
241 var copied = 0L
242
243 while (copied < limit) {
244 buffer.clear()
245 if (limit - copied < buffer.limit()) {
246 buffer.limit((limit - copied).toInt())
247 }
248 val size = readAvailable(buffer)
249 if (size == -1) break
250
251 buffer.flip()
252 packet.writeFully(buffer)
253 copied += size
254 }
255
256 return packet.build()
257 } catch (t: Throwable) {
258 packet.release()
259 throw t
260 } finally {
261 BufferPool.recycle(buffer)
262 }
263}