blob: 50d15236c49fef71bfd86f9b2b8ae4b9076300c7 [file] [log] [blame]
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03001/*
2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
Sergey Mashkov86f70612017-07-25 10:59:44 +03005package kotlinx.coroutines.experimental.io
6
Sergey Mashkov6d059932017-10-06 18:46:23 +03007import kotlinx.coroutines.experimental.io.packet.*
Sergey Mashkovc4694cc2017-10-09 12:51:34 +03008import kotlinx.coroutines.experimental.io.packet.ByteReadPacket
9import kotlinx.io.core.*
Roman Elizarov83762032017-08-08 19:58:40 +030010import java.nio.CharBuffer
Roman Elizarov469cad32017-08-15 15:54:56 +030011import java.util.concurrent.CancellationException
Sergey Mashkov86f70612017-07-25 10:59:44 +030012
Roman Elizarov469cad32017-08-15 15:54:56 +030013/**
14 * Channel for asynchronous writing of sequences of bytes.
15 * This is a **single-writer channel**.
16 *
17 * Operations on this channel cannot be invoked concurrently, unless explicitly specified otherwise
18 * in description. Exceptions are [close] and [flush].
19 */
20public interface ByteWriteChannel {
Sergey Mashkov86f70612017-07-25 10:59:44 +030021 /**
Roman Elizarov469cad32017-08-15 15:54:56 +030022 * Returns number of bytes that can be written without suspension. Write operations do no suspend and return
23 * immediately when this number is at least the number of bytes requested for write.
Sergey Mashkov86f70612017-07-25 10:59:44 +030024 */
Roman Elizarov469cad32017-08-15 15:54:56 +030025 public val availableForWrite: Int
Sergey Mashkov86f70612017-07-25 10:59:44 +030026
27 /**
Roman Elizarov469cad32017-08-15 15:54:56 +030028 * Returns `true` is channel has been closed and attempting to write to the channel will cause an exception.
Sergey Mashkov86f70612017-07-25 10:59:44 +030029 */
Roman Elizarov469cad32017-08-15 15:54:56 +030030 public val isClosedForWrite: Boolean
Sergey Mashkov86f70612017-07-25 10:59:44 +030031
32 /**
Roman Elizarov469cad32017-08-15 15:54:56 +030033 * Returns `true` if channel flushes automatically all pending bytes after every write function call.
34 * If `false` then flush only happens at manual [flush] invocation or when the buffer is full.
Sergey Mashkov86f70612017-07-25 10:59:44 +030035 */
Roman Elizarov469cad32017-08-15 15:54:56 +030036 public val autoFlush: Boolean
37
38 /**
39 * Byte order that is used for multi-byte write operations
40 * (such as [writeShort], [writeInt], [writeLong], [writeFloat], and [writeDouble]).
41 */
42 public var writeByteOrder: ByteOrder
Sergey Mashkov86f70612017-07-25 10:59:44 +030043
44 /**
Sergey Mashkovaae062d2017-09-07 15:12:50 +030045 * Number of bytes written to the channel.
46 * It is not guaranteed to be atomic so could be updated in the middle of write operation.
47 */
48 public val totalBytesWritten: Long
49
50 /**
Sergey Mashkov8fbdff42018-02-02 17:17:55 +030051 * An closure cause exception or `null` if closed successfully or not yet closed
52 */
53 public val closedCause: Throwable?
54
55 /**
Sergey Mashkov86f70612017-07-25 10:59:44 +030056 * Writes as much as possible and only suspends if buffer is full
57 */
Roman Elizarov469cad32017-08-15 15:54:56 +030058 suspend fun writeAvailable(src: ByteArray, offset: Int, length: Int): Int
59 suspend fun writeAvailable(src: ByteArray) = writeAvailable(src, 0, src.size)
60 suspend fun writeAvailable(src: ByteBuffer): Int
Sergey Mashkovc4694cc2017-10-09 12:51:34 +030061 suspend fun writeAvailable(src: BufferView): Int
Sergey Mashkov86f70612017-07-25 10:59:44 +030062
63 /**
64 * Writes all [src] bytes and suspends until all bytes written. Causes flush if buffer filled up or when [autoFlush]
65 * Crashes if channel get closed while writing.
66 */
67 suspend fun writeFully(src: ByteArray, offset: Int, length: Int)
Sergey Mashkov86f70612017-07-25 10:59:44 +030068 suspend fun writeFully(src: ByteArray) = writeFully(src, 0, src.size)
Roman Elizarov469cad32017-08-15 15:54:56 +030069 suspend fun writeFully(src: ByteBuffer)
Sergey Mashkovc4694cc2017-10-09 12:51:34 +030070 suspend fun writeFully(src: BufferView)
Sergey Mashkov86f70612017-07-25 10:59:44 +030071
72 /**
Sergey Mashkov75101172017-09-25 19:41:08 +030073 * Invokes [block] when it will be possible to write at least [min] bytes
74 * providing byte buffer to it so lambda can write to the buffer
75 * up to [ByteBuffer.remaining] bytes. If there are no [min] bytes spaces available then the invocation could
76 * suspend until the requirement will be met.
77 *
78 * Warning: it is not guaranteed that all of remaining bytes will be represented as a single byte buffer
79 * eg: it could be 4 bytes available for write but the provided byte buffer could have only 2 remaining bytes:
80 * in this case you have to invoke write again (with decreased [min] accordingly).
81 *
82 * @param min amount of bytes available for write, should be positive
83 * @param block to be invoked when at least [min] bytes free capacity available
84 */
85 suspend fun write(min: Int = 1, block: (ByteBuffer) -> Unit)
86
87 /**
Sergey Mashkovdfae5e22017-12-18 19:31:27 +030088 * Invokes [block] for every free buffer until it return `false`. It will also suspend every time when no free
89 * space available for write.
90 *
91 * @param block to be invoked when there is free space available for write
92 */
93 suspend fun writeWhile(block: (ByteBuffer) -> Boolean)
94
Sergey Mashkov09b621e2017-12-22 20:42:18 +030095 suspend fun writeSuspendSession(visitor: suspend WriterSuspendSession.() -> Unit)
96
Sergey Mashkovdfae5e22017-12-18 19:31:27 +030097 /**
Sergey Mashkova10b6922017-08-03 13:41:54 +030098 * Writes a [packet] fully or fails if channel get closed before the whole packet has been written
99 */
100 suspend fun writePacket(packet: ByteReadPacket)
101 /**
Sergey Mashkov86f70612017-07-25 10:59:44 +0300102 * Writes long number and suspends until written.
103 * Crashes if channel get closed while writing.
104 */
105 suspend fun writeLong(l: Long)
106
107 /**
108 * Writes int number and suspends until written.
109 * Crashes if channel get closed while writing.
110 */
111 suspend fun writeInt(i: Int)
112
113 /**
114 * Writes short number and suspends until written.
115 * Crashes if channel get closed while writing.
116 */
117 suspend fun writeShort(s: Short)
118
119 /**
120 * Writes byte and suspends until written.
121 * Crashes if channel get closed while writing.
122 */
123 suspend fun writeByte(b: Byte)
124
125 /**
126 * Writes double number and suspends until written.
127 * Crashes if channel get closed while writing.
128 */
129 suspend fun writeDouble(d: Double)
130
131 /**
132 * Writes float number and suspends until written.
133 * Crashes if channel get closed while writing.
134 */
135 suspend fun writeFloat(f: Float)
136
137 /**
Roman Elizarov469cad32017-08-15 15:54:56 +0300138 * Closes this channel with an optional exceptional [cause].
139 * It flushes all pending write bytes (via [flush]).
140 * This is an idempotent operation -- repeated invocations of this function have no effect and return `false`.
141 *
142 * A channel that was closed without a [cause], is considered to be _closed normally_.
143 * A channel that was closed with non-null [cause] is called a _failed channel_. Attempts to read or
144 * write on a failed channel throw this cause exception.
145 *
146 * After invocation of this operation [isClosedForWrite] starts returning `true` and
147 * all subsequent write operations throw [ClosedWriteChannelException] or the specified [cause].
148 * However, [isClosedForRead][ByteReadChannel.isClosedForRead] on the side of [ByteReadChannel]
149 * starts returning `true` only after all written bytes have been read.
Sergey Mashkov1e402372017-12-05 16:16:50 +0300150 *
151 * Please note that if the channel has been closed with cause and it has been provided by [reader] or [writer]
152 * coroutine then the corresponding coroutine will be cancelled with [cause]. If no [cause] provided then no
153 * cancellation will be propagated.
Sergey Mashkov86f70612017-07-25 10:59:44 +0300154 */
Roman Elizarov469cad32017-08-15 15:54:56 +0300155 public fun close(cause: Throwable? = null): Boolean
Sergey Mashkov86f70612017-07-25 10:59:44 +0300156
157 /**
Roman Elizarov469cad32017-08-15 15:54:56 +0300158 * Flushes all pending write bytes making them available for read.
159 *
160 * This function is thread-safe and can be invoked in any thread at any time.
161 * It does nothing when invoked on a closed channel.
Sergey Mashkov86f70612017-07-25 10:59:44 +0300162 */
Roman Elizarov469cad32017-08-15 15:54:56 +0300163 public fun flush()
Sergey Mashkov86f70612017-07-25 10:59:44 +0300164}
165
166suspend fun ByteWriteChannel.writeShort(s: Int) {
Sergey Mashkovf75ec152017-11-14 14:11:46 +0300167 return writeShort((s and 0xffff).toShort())
Sergey Mashkov86f70612017-07-25 10:59:44 +0300168}
169
170suspend fun ByteWriteChannel.writeByte(b: Int) {
Sergey Mashkovf75ec152017-11-14 14:11:46 +0300171 return writeByte((b and 0xff).toByte())
Sergey Mashkov86f70612017-07-25 10:59:44 +0300172}
173
174suspend fun ByteWriteChannel.writeInt(i: Long) {
Sergey Mashkovf75ec152017-11-14 14:11:46 +0300175 return writeInt(i.toInt())
Sergey Mashkov86f70612017-07-25 10:59:44 +0300176}
177
Sergey Mashkov8158fec2017-07-25 15:33:11 +0300178suspend fun ByteWriteChannel.writeStringUtf8(s: CharSequence) {
Sergey Mashkovfbcba262017-08-23 12:26:18 +0300179 val packet = buildPacket {
180 writeStringUtf8(s)
181 }
Sergey Mashkovf75ec152017-11-14 14:11:46 +0300182
183 return writePacket(packet)
Sergey Mashkov8158fec2017-07-25 15:33:11 +0300184}
185
186suspend fun ByteWriteChannel.writeStringUtf8(s: CharBuffer) {
Sergey Mashkovfbcba262017-08-23 12:26:18 +0300187 val packet = buildPacket {
188 writeStringUtf8(s)
189 }
Sergey Mashkovf75ec152017-11-14 14:11:46 +0300190
191 return writePacket(packet)
Sergey Mashkov8158fec2017-07-25 15:33:11 +0300192}
193
Sergey Mashkov86f70612017-07-25 10:59:44 +0300194suspend fun ByteWriteChannel.writeStringUtf8(s: String) {
Sergey Mashkovfbcba262017-08-23 12:26:18 +0300195 val packet = buildPacket {
196 writeStringUtf8(s)
197 }
Sergey Mashkovf75ec152017-11-14 14:11:46 +0300198
199 return writePacket(packet)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300200}
201
202suspend fun ByteWriteChannel.writeBoolean(b: Boolean) {
Sergey Mashkovf75ec152017-11-14 14:11:46 +0300203 return writeByte(if (b) 1 else 0)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300204}
205
206/**
207 * Writes UTF16 character
208 */
Sergey Mashkov8158fec2017-07-25 15:33:11 +0300209suspend fun ByteWriteChannel.writeChar(ch: Char) {
Sergey Mashkovf75ec152017-11-14 14:11:46 +0300210 return writeShort(ch.toInt())
Sergey Mashkov8158fec2017-07-25 15:33:11 +0300211}
212
Sergey Mashkov57906eb2017-09-19 14:08:06 +0300213inline suspend fun ByteWriteChannel.writePacket(headerSizeHint: Int = 0, builder: ByteWritePacket.() -> Unit) {
Sergey Mashkovf75ec152017-11-14 14:11:46 +0300214 return writePacket(buildPacket(headerSizeHint, builder))
Sergey Mashkov22909432017-09-05 19:13:27 +0300215}
216
217suspend fun ByteWriteChannel.writePacketSuspend(builder: suspend ByteWritePacket.() -> Unit) {
Sergey Mashkovf75ec152017-11-14 14:11:46 +0300218 return writePacket(buildPacket { builder() })
Sergey Mashkov22909432017-09-05 19:13:27 +0300219}
220
Roman Elizarov339ccf32017-09-27 17:44:00 +0300221/**
222 * Indicates attempt to write on [isClosedForWrite][ByteWriteChannel.isClosedForWrite] channel
223 * that was closed without a cause. A _failed_ channel rethrows the original [close][ByteWriteChannel.close] cause
224 * exception on send attempts.
225 */
226public class ClosedWriteChannelException(message: String?) : CancellationException(message)