blob: 41d5ee00a5887ff1f9db0ea011ecfa4d47533fee [file] [log] [blame]
Sergey Mashkov6e61c232017-12-19 15:21:14 +03001package kotlinx.coroutines.experimental.io.jvm.javaio
2
3import kotlinx.atomicfu.*
4import kotlinx.coroutines.experimental.*
5import kotlinx.coroutines.experimental.io.*
6import java.io.*
7import java.util.concurrent.locks.*
8import kotlin.coroutines.experimental.*
9import kotlin.coroutines.experimental.intrinsics.*
10
11/**
12 * Create blocking [java.io.InputStream] for this channel that does block every time the channel suspends at read
13 * Similar to do reading in [runBlocking] however you can pass it to regular blocking API
14 */
15fun ByteReadChannel.toInputStream(parent: Job? = null): InputStream = InputAdapter(parent, this)
16
17/**
18 * Create blocking [java.io.OutputStream] for this channel that does block every time the channel suspends at write
19 * Similar to do reading in [runBlocking] however you can pass it to regular blocking API
20 */
21fun ByteWriteChannel.toOutputStream(parent: Job? = null): OutputStream = OutputAdapter(parent, this)
22
23private class InputAdapter(parent: Job?, private val channel: ByteReadChannel) : InputStream() {
24 private val loop = object : BlockingAdapter(parent) {
25 override suspend fun loop() {
26 var rc = 0
27 while (true) {
28 val buffer = rendezvous(rc) as ByteArray
29 rc = channel.readAvailable(buffer, offset, length)
30 if (rc == -1) break
31 }
32 }
33 }
34
35 private var single: ByteArray? = null
36
37 override fun available(): Int {
38 return channel.availableForRead
39 }
40
41 @Synchronized
42 override fun read(): Int {
43 val buffer = single ?: ByteArray(1).also { single = it }
44 loop.submitAndAwait(buffer, 0, 1)
45 return buffer[0].toInt() and 0xff
46 }
47
48 @Synchronized
49 override fun read(b: ByteArray?, off: Int, len: Int): Int {
50 return loop.submitAndAwait(b!!, off, len)
51 }
52
53 @Synchronized
54 override fun close() {
55 super.close()
56 channel.cancel()
57 loop.shutdown()
58 }
59}
60
61private val CloseToken = Any()
62private val FlushToken = Any()
63
64private class OutputAdapter(parent: Job?, private val channel: ByteWriteChannel) : OutputStream() {
65 private val loop = object : BlockingAdapter(parent) {
66 override suspend fun loop() {
67 try {
68 while (true) {
69 val task = rendezvous(0)
Sergey Mashkovf0c1d992018-02-02 17:18:47 +030070 if (task === CloseToken) {
71 break
72 }
73 else if (task === FlushToken) {
74 channel.flush()
75 channel.closedCause?.let { throw it }
76 }
Sergey Mashkov6e61c232017-12-19 15:21:14 +030077 else if (task is ByteArray) channel.writeFully(task, offset, length)
78 }
79 } catch (t: Throwable) {
80 if (t !is CancellationException) {
81 channel.close(t)
82 }
Sergey Mashkovf0c1d992018-02-02 17:18:47 +030083 throw t
Sergey Mashkov6e61c232017-12-19 15:21:14 +030084 } finally {
Sergey Mashkovf0c1d992018-02-02 17:18:47 +030085 if (!channel.close()) {
86 channel.closedCause?.let { throw it }
87 }
Sergey Mashkov6e61c232017-12-19 15:21:14 +030088 }
89 }
90 }
91
92 private var single: ByteArray? = null
93
94 @Synchronized
95 override fun write(b: Int) {
96 val buffer = single ?: ByteArray(1).also { single = it }
97 buffer[0] = b.toByte()
98 loop.submitAndAwait(buffer, 0, 1)
99 }
100
101 @Synchronized
102 override fun write(b: ByteArray?, off: Int, len: Int) {
103 loop.submitAndAwait(b!!, off, len)
104 }
105
106 @Synchronized
107 override fun flush() {
108 loop.submitAndAwait(FlushToken)
109 }
110
111 @Synchronized
112 override fun close() {
Sergey Mashkovf0c1d992018-02-02 17:18:47 +0300113 try {
114 loop.submitAndAwait(CloseToken)
115 loop.shutdown()
116 } catch (t: Throwable) {
117 throw IOException(t)
118 }
Sergey Mashkov6e61c232017-12-19 15:21:14 +0300119 }
120}
121
122private abstract class BlockingAdapter(val parent: Job? = null) {
123 private val end: Continuation<Unit> = object : Continuation<Unit> {
124 override val context: CoroutineContext
125 get() = if (parent != null) Unconfined + parent else EmptyCoroutineContext
126
127 override fun resume(value: Unit) {
128 var thread: Thread? = null
129 result.value = -1
130 state.update { current ->
131 when (current) {
132 is Thread -> {
133 thread = current
134 Unit
135 }
136 this -> Unit
137 else -> return
138 }
139 }
140
141 thread?.let { LockSupport.unpark(it) }
142 disposable?.dispose()
143 }
144
145 override fun resumeWithException(exception: Throwable) {
146 var thread: Thread? = null
147 var continuation: Continuation<*>? = null
148
149 result.value = -1
150 state.update { current ->
151 when (current) {
152 is Thread -> {
153 thread = current
154 exception
155 }
156 is Continuation<*> -> {
157 continuation = current
158 exception
159 }
160 this -> exception
161 else -> return
162 }
163 }
164
165 thread?.let { LockSupport.unpark(it) }
166 continuation?.resumeWithException(exception)
167
168 if (exception !is CancellationException) {
169 parent?.cancel(exception)
170 }
171
172 disposable?.dispose()
173 }
174 }
175
176 @Suppress("LeakingThis")
177 private val state: AtomicRef<Any> = atomic(this) // could be a thread, a continuation, Unit, an exception or this if not yet started
178 private val result = atomic(0)
179 private val disposable: DisposableHandle? = parent?.invokeOnCompletion { cause ->
180 if (cause != null) {
181 end.resumeWithException(cause)
182 }
183 }
184
185 protected var offset: Int = 0
186 private set
187 protected var length: Int = 0
188 private set
189
190 init {
191 val block: suspend () -> Unit = { loop() }
192 block.startCoroutineUninterceptedOrReturn(end)
193 require(state.value !== this)
194 }
195
196 protected abstract suspend fun loop()
197
198 fun shutdown() {
199 disposable?.dispose()
200 end.resumeWithException(CancellationException("Stream closed"))
201 }
202
203 fun submitAndAwait(buffer: ByteArray, offset: Int, length: Int): Int {
204 this.offset = offset
205 this.length = length
206 return submitAndAwait(buffer)
207 }
208
209 fun submitAndAwait(jobToken: Any): Int {
210 val thread = Thread.currentThread()!!
211
212 var cont: Continuation<Any>? = null
213
214 state.update { value ->
215 when (value) {
216 is Continuation<*> -> {
217 @Suppress("UNCHECKED_CAST")
218 cont = value as Continuation<Any>
219 thread
220 }
221 is Unit -> {
222 return result.value
223 }
224 is Throwable -> {
225 throw value
226 }
227 is Thread -> throw IllegalStateException("There is already thread owning adapter")
228 this -> throw IllegalStateException("Not yet started")
229 else -> NoWhenBranchMatchedException()
230 }
231 }
232
233 cont!!.resume(jobToken)
234
235 while (state.value === thread) {
236 LockSupport.park()
237 }
238
Sergey Mashkovf0c1d992018-02-02 17:18:47 +0300239 state.value.let { state ->
240 if (state is Throwable) {
241 throw state
242 }
243 }
244
Sergey Mashkov6e61c232017-12-19 15:21:14 +0300245 return result.value
246 }
247
248 @Suppress("NOTHING_TO_INLINE")
249 protected suspend inline fun rendezvous(rc: Int): Any {
250 result.value = rc
251
252 return suspendCoroutineOrReturn { c ->
253 var thread: Thread? = null
254
255 state.update { value ->
256 when (value) {
257 is Thread -> {
258 thread = value
259 c
260 }
261 this -> c
262 else -> throw IllegalStateException("Already suspended or in finished state")
263 }
264 }
265
266 if (thread != null) {
267 LockSupport.unpark(thread)
268 }
269
270 COROUTINE_SUSPENDED
271 }
272 }
273}