blob: 6e35f6a6671b04d552eacff30f3536b5cca34554 [file] [log] [blame]
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03001/*
2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
Sergey Mashkov86f70612017-07-25 10:59:44 +03005package kotlinx.coroutines.experimental.io
6
Sergey Mashkov72e91502017-09-03 00:17:14 +03007import kotlinx.coroutines.experimental.io.internal.*
Sergey Mashkova22ed422017-09-05 19:24:30 +03008import kotlinx.coroutines.experimental.io.packet.*
Sergey Mashkov6d059932017-10-06 18:46:23 +03009import kotlinx.coroutines.experimental.io.packet.ByteReadPacket
10import kotlinx.io.core.*
Sergey Mashkov86f70612017-07-25 10:59:44 +030011
Roman Elizarov469cad32017-08-15 15:54:56 +030012/**
13 * Channel for asynchronous reading of sequences of bytes.
14 * This is a **single-reader channel**.
15 *
16 * Operations on this channel cannot be invoked concurrently.
17 */
18public interface ByteReadChannel {
Sergey Mashkov86f70612017-07-25 10:59:44 +030019 /**
Roman Elizarov469cad32017-08-15 15:54:56 +030020 * Returns number of bytes that can be read without suspension. Read operations do no suspend and return
21 * immediately when this number is at least the number of bytes requested for read.
Sergey Mashkov86f70612017-07-25 10:59:44 +030022 */
Roman Elizarov469cad32017-08-15 15:54:56 +030023 public val availableForRead: Int
Sergey Mashkov86f70612017-07-25 10:59:44 +030024
25 /**
Roman Elizarov469cad32017-08-15 15:54:56 +030026 * Returns `true` if the channel is closed and no remaining bytes are available for read.
27 * It implies that [availableForRead] is zero.
Sergey Mashkov86f70612017-07-25 10:59:44 +030028 */
Roman Elizarov469cad32017-08-15 15:54:56 +030029 public val isClosedForRead: Boolean
Sergey Mashkov86f70612017-07-25 10:59:44 +030030
Sergey Mashkov4cd00142017-09-05 19:25:32 +030031 public val isClosedForWrite: Boolean
32
Sergey Mashkov86f70612017-07-25 10:59:44 +030033 /**
Roman Elizarov469cad32017-08-15 15:54:56 +030034 * Byte order that is used for multi-byte read operations
35 * (such as [readShort], [readInt], [readLong], [readFloat], and [readDouble]).
36 */
37 public var readByteOrder: ByteOrder
38
39 /**
Sergey Mashkovaae062d2017-09-07 15:12:50 +030040 * Number of bytes read from the channel.
41 * It is not guaranteed to be atomic so could be updated in the middle of long running read operation.
42 */
43 public val totalBytesRead: Long
44
45 /**
Roman Elizarov469cad32017-08-15 15:54:56 +030046 * Reads all available bytes to [dst] buffer and returns immediately or suspends if no bytes available
Sergey Mashkov86f70612017-07-25 10:59:44 +030047 * @return number of bytes were read or `-1` if the channel has been closed
48 */
Roman Elizarov469cad32017-08-15 15:54:56 +030049 suspend fun readAvailable(dst: ByteArray, offset: Int, length: Int): Int
50 suspend fun readAvailable(dst: ByteArray) = readAvailable(dst, 0, dst.size)
51 suspend fun readAvailable(dst: ByteBuffer): Int
Sergey Mashkovc4694cc2017-10-09 12:51:34 +030052 suspend fun readAvailable(dst: BufferView): Int
Sergey Mashkov86f70612017-07-25 10:59:44 +030053
54 /**
55 * Reads all [length] bytes to [dst] buffer or fails if channel has been closed.
56 * Suspends if not enough bytes available.
57 */
58 suspend fun readFully(dst: ByteArray, offset: Int, length: Int)
Sergey Mashkov86f70612017-07-25 10:59:44 +030059 suspend fun readFully(dst: ByteArray) = readFully(dst, 0, dst.size)
Roman Elizarov469cad32017-08-15 15:54:56 +030060 suspend fun readFully(dst: ByteBuffer): Int
Sergey Mashkov86f70612017-07-25 10:59:44 +030061
62 /**
Sergey Mashkova10b6922017-08-03 13:41:54 +030063 * Reads the specified amount of bytes and makes a byte packet from them. Fails if channel has been closed
Sergey Mashkov57906eb2017-09-19 14:08:06 +030064 * and not enough bytes available. Accepts [headerSizeHint] to be provided, see [WritePacket].
Sergey Mashkova10b6922017-08-03 13:41:54 +030065 */
Sergey Mashkov57906eb2017-09-19 14:08:06 +030066 suspend fun readPacket(size: Int, headerSizeHint: Int = 0): ByteReadPacket
Sergey Mashkova10b6922017-08-03 13:41:54 +030067
68 /**
Sergey Mashkov86f70612017-07-25 10:59:44 +030069 * Reads a long number (suspending if not enough bytes available) or fails if channel has been closed
70 * and not enough bytes.
71 */
72 suspend fun readLong(): Long
73
74 /**
75 * Reads an int number (suspending if not enough bytes available) or fails if channel has been closed
76 * and not enough bytes.
77 */
78 suspend fun readInt(): Int
79
80 /**
81 * Reads a short number (suspending if not enough bytes available) or fails if channel has been closed
82 * and not enough bytes.
83 */
84 suspend fun readShort(): Short
85
86 /**
87 * Reads a byte (suspending if no bytes available yet) or fails if channel has been closed
88 * and not enough bytes.
89 */
90 suspend fun readByte(): Byte
91
92 /**
93 * Reads a boolean value (suspending if no bytes available yet) or fails if channel has been closed
94 * and not enough bytes.
95 */
96 suspend fun readBoolean(): Boolean
97
98 /**
Sergey Mashkov86f70612017-07-25 10:59:44 +030099 * Reads double number (suspending if not enough bytes available) or fails if channel has been closed
100 * and not enough bytes.
101 */
102 suspend fun readDouble(): Double
103
104 /**
105 * Reads float number (suspending if not enough bytes available) or fails if channel has been closed
106 * and not enough bytes.
107 */
108 suspend fun readFloat(): Float
109
Sergey Mashkov4cd00142017-09-05 19:25:32 +0300110 /**
111 * For every available bytes range invokes [visitor] function until it return false or end of stream encountered
112 */
Sergey Mashkov09b621e2017-12-22 20:42:18 +0300113 suspend fun consumeEachBufferRange(visitor: ConsumeEachBufferVisitor)
Sergey Mashkov4cd00142017-09-05 19:25:32 +0300114
115 fun <R> lookAhead(visitor: LookAheadSession.() -> R): R
116 suspend fun <R> lookAheadSuspend(visitor: suspend LookAheadSuspendSession.() -> R): R
Sergey Mashkov86f70612017-07-25 10:59:44 +0300117
118 /**
119 * Reads a line of UTF-8 characters to the specified [out] buffer up to [limit] characters.
120 * Supports both CR-LF and LF line endings.
121 * Throws an exception if the specified [limit] has been exceeded.
122 *
123 * @return `true` if line has been read (possibly empty) or `false` if channel has been closed
124 * and no characters were read.
125 */
126 suspend fun <A : Appendable> readUTF8LineTo(out: A, limit: Int = Int.MAX_VALUE): Boolean
Sergey Mashkov75101172017-09-25 19:41:08 +0300127
128 /**
129 * Invokes [block] when it will be possible to read at least [min] bytes
130 * providing byte buffer to it so lambda can read from the buffer
131 * up to [ByteBuffer.remaining] bytes. If there are no [min] bytes available then the invocation could
132 * suspend until the requirement will be met.
133 *
Sergey Mashkove86eb082017-12-04 18:45:05 +0300134 * If [min] is zero then the invocation will suspend until at least one byte available.
135 *
Sergey Mashkov75101172017-09-25 19:41:08 +0300136 * Warning: it is not guaranteed that all of remaining bytes will be represented as a single byte buffer
137 * eg: it could be 4 bytes available for read but the provided byte buffer could have only 2 remaining bytes:
138 * in this case you have to invoke read again (with decreased [min] accordingly).
139 *
Sergey Mashkove86eb082017-12-04 18:45:05 +0300140 * It will fail with [EOFException] if not enough bytes ([availableForRead] < [min]) available
141 * in the channel after it is closed.
142 *
143 * [block] lambda should modify buffer's position accordingly. It also could temporarily modify limit however
144 * it should restore it before return. It is not recommended to access any bytes of the buffer outside of the
145 * provided byte range [position(); limit()) as there could be any garbage or incomplete data.
146 *
147 * @param min amount of bytes available for read, should be positive or zero
Sergey Mashkov75101172017-09-25 19:41:08 +0300148 * @param block to be invoked when at least [min] bytes available for read
149 */
150 suspend fun read(min: Int = 1, block: (ByteBuffer) -> Unit)
Sergey Mashkoveee3aaf2017-12-05 15:51:55 +0300151
Sergey Mashkov1e402372017-12-05 16:16:50 +0300152 /**
153 * Close channel with optional [cause] cancellation. Unlike [ByteWriteChannel.close] that could close channel
154 * normally, cancel does always close with error so any operations on this channel will always fail
155 * and all suspensions will be resumed with exception.
156 *
157 * Please note that if the channel has been provided by [reader] or [writer] then the corresponding owning
158 * coroutine will be cancelled as well
159 *
160 * @see ByteWriteChannel.close
161 */
Sergey Mashkova26b7902017-12-05 16:06:03 +0300162 fun cancel(cause: Throwable? = null): Boolean
Sergey Mashkov4df720f2017-12-05 16:45:55 +0300163
164 /**
165 * Discard up to [max] bytes
166 *
167 * @return number of bytes were discarded
168 */
169 suspend fun discard(max: Long = Long.MAX_VALUE): Long
Roman Elizarov469cad32017-08-15 15:54:56 +0300170}
171
Sergey Mashkov09b621e2017-12-22 20:42:18 +0300172typealias ConsumeEachBufferVisitor = (buffer: java.nio.ByteBuffer, last: Boolean) -> Boolean
173
Sergey Mashkov78317162018-03-21 10:12:40 +0300174suspend fun ByteReadChannel.joinTo(dst: ByteWriteChannel, closeOnEnd: Boolean, flushOnEnd: Boolean = true) {
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300175 require(dst !== this)
176
177 if (this is ByteBufferChannel && dst is ByteBufferChannel) {
Sergey Mashkov78317162018-03-21 10:12:40 +0300178 return dst.joinFrom(this, closeOnEnd, flushOnEnd)
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300179 }
180
Sergey Mashkov78317162018-03-21 10:12:40 +0300181 return joinToImplSuspend(dst, closeOnEnd, flushOnEnd)
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300182}
183
Sergey Mashkov78317162018-03-21 10:12:40 +0300184private suspend fun ByteReadChannel.joinToImplSuspend(dst: ByteWriteChannel, close: Boolean, flush: Boolean) {
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300185 copyToImpl(dst, Long.MAX_VALUE)
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300186 if (close) {
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300187 dst.close()
Sergey Mashkov78317162018-03-21 10:12:40 +0300188 } else if (flush) {
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300189 dst.flush()
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300190 }
191}
192
Sergey Mashkov72e91502017-09-03 00:17:14 +0300193/**
194 * Reads up to [limit] bytes from receiver channel and writes them to [dst] channel.
195 * Closes [dst] channel if fails to read or write with cause exception.
196 * @return a number of copied bytes
197 */
198suspend fun ByteReadChannel.copyTo(dst: ByteWriteChannel, limit: Long = Long.MAX_VALUE): Long {
Sergey Mashkov88951ac2017-10-27 22:36:08 +0300199 require(this !== dst)
200 require(limit >= 0L)
201
202 if (this is ByteBufferChannel && dst is ByteBufferChannel) {
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300203 return dst.copyDirect(this, limit, null)
Sergey Mashkov88951ac2017-10-27 22:36:08 +0300204 }
205
206 return copyToImpl(dst, limit)
207}
208
209private suspend fun ByteReadChannel.copyToImpl(dst: ByteWriteChannel, limit: Long): Long {
Sergey Mashkov72e91502017-09-03 00:17:14 +0300210 val buffer = BufferPool.borrow()
Sergey Mashkov88951ac2017-10-27 22:36:08 +0300211 val dstNeedsFlush = !dst.autoFlush
212
Roman Elizarov469cad32017-08-15 15:54:56 +0300213 try {
214 var copied = 0L
Roman Elizarov469cad32017-08-15 15:54:56 +0300215
Sergey Mashkov88951ac2017-10-27 22:36:08 +0300216 while (true) {
Sergey Mashkov72e91502017-09-03 00:17:14 +0300217 buffer.clear()
Sergey Mashkov88951ac2017-10-27 22:36:08 +0300218
219 val bufferLimit = limit - copied
220 if (bufferLimit <= 0) break
221 if (bufferLimit < buffer.limit()) {
222 buffer.limit(bufferLimit.toInt())
Sergey Mashkov72e91502017-09-03 00:17:14 +0300223 }
224 val size = readAvailable(buffer)
Roman Elizarov469cad32017-08-15 15:54:56 +0300225 if (size == -1) break
226
Sergey Mashkov72e91502017-09-03 00:17:14 +0300227 buffer.flip()
228 dst.writeFully(buffer)
Roman Elizarov469cad32017-08-15 15:54:56 +0300229 copied += size
Sergey Mashkov88951ac2017-10-27 22:36:08 +0300230
231 if (dstNeedsFlush && availableForRead == 0) {
232 dst.flush()
233 }
Roman Elizarov469cad32017-08-15 15:54:56 +0300234 }
Roman Elizarov469cad32017-08-15 15:54:56 +0300235 return copied
236 } catch (t: Throwable) {
237 dst.close(t)
238 throw t
239 } finally {
Sergey Mashkov72e91502017-09-03 00:17:14 +0300240 BufferPool.recycle(buffer)
Roman Elizarov469cad32017-08-15 15:54:56 +0300241 }
242}
Sergey Mashkov7c8b1552017-08-31 15:13:51 +0300243
Sergey Mashkov72e91502017-09-03 00:17:14 +0300244/**
245 * Reads all the bytes from receiver channel and writes them to [dst] channel and then closes it.
246 * Closes [dst] channel if fails to read or write with cause exception.
247 * @return a number of copied bytes
248 */
Sergey Mashkov074f0a22017-12-18 19:32:25 +0300249suspend fun ByteReadChannel.copyAndClose(dst: ByteWriteChannel, limit: Long = Long.MAX_VALUE): Long {
250 val count = copyTo(dst, limit)
Sergey Mashkov7c8b1552017-08-31 15:13:51 +0300251 dst.close()
252 return count
253}
Sergey Mashkova22ed422017-09-05 19:24:30 +0300254
255/**
256 * Reads all the bytes from receiver channel and builds a packet that is returned unless the specified [limit] exceeded.
257 * It will simply stop reading and return packet of size [limit] in this case
258 */
Sergey Mashkov2c3a0dc2017-09-06 22:58:08 +0300259suspend fun ByteReadChannel.readRemaining(limit: Int = Int.MAX_VALUE): ByteReadPacket {
Sergey Mashkova22ed422017-09-05 19:24:30 +0300260 val buffer = BufferPool.borrow()
261 val packet = WritePacket()
262
263 try {
264 var copied = 0L
265
266 while (copied < limit) {
267 buffer.clear()
268 if (limit - copied < buffer.limit()) {
269 buffer.limit((limit - copied).toInt())
270 }
271 val size = readAvailable(buffer)
272 if (size == -1) break
273
274 buffer.flip()
275 packet.writeFully(buffer)
276 copied += size
277 }
278
279 return packet.build()
280 } catch (t: Throwable) {
281 packet.release()
282 throw t
283 } finally {
284 BufferPool.recycle(buffer)
285 }
286}