blob: 5bad2340b3e17aeed71b52fa963ca8c47631f9f0 [file] [log] [blame]
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03001/*
2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
Sergey Mashkov6e61c232017-12-19 15:21:14 +03005package kotlinx.coroutines.experimental.io.jvm.javaio
6
7import kotlinx.atomicfu.*
8import kotlinx.coroutines.experimental.*
9import kotlinx.coroutines.experimental.io.*
10import java.io.*
11import java.util.concurrent.locks.*
12import kotlin.coroutines.experimental.*
13import kotlin.coroutines.experimental.intrinsics.*
14
15/**
16 * Create blocking [java.io.InputStream] for this channel that does block every time the channel suspends at read
17 * Similar to do reading in [runBlocking] however you can pass it to regular blocking API
18 */
19fun ByteReadChannel.toInputStream(parent: Job? = null): InputStream = InputAdapter(parent, this)
20
21/**
22 * Create blocking [java.io.OutputStream] for this channel that does block every time the channel suspends at write
23 * Similar to do reading in [runBlocking] however you can pass it to regular blocking API
24 */
25fun ByteWriteChannel.toOutputStream(parent: Job? = null): OutputStream = OutputAdapter(parent, this)
26
27private class InputAdapter(parent: Job?, private val channel: ByteReadChannel) : InputStream() {
28 private val loop = object : BlockingAdapter(parent) {
29 override suspend fun loop() {
30 var rc = 0
31 while (true) {
32 val buffer = rendezvous(rc) as ByteArray
33 rc = channel.readAvailable(buffer, offset, length)
34 if (rc == -1) break
35 }
36 }
37 }
38
39 private var single: ByteArray? = null
40
41 override fun available(): Int {
42 return channel.availableForRead
43 }
44
45 @Synchronized
46 override fun read(): Int {
47 val buffer = single ?: ByteArray(1).also { single = it }
48 loop.submitAndAwait(buffer, 0, 1)
49 return buffer[0].toInt() and 0xff
50 }
51
52 @Synchronized
53 override fun read(b: ByteArray?, off: Int, len: Int): Int {
54 return loop.submitAndAwait(b!!, off, len)
55 }
56
57 @Synchronized
58 override fun close() {
59 super.close()
60 channel.cancel()
61 loop.shutdown()
62 }
63}
64
65private val CloseToken = Any()
66private val FlushToken = Any()
67
68private class OutputAdapter(parent: Job?, private val channel: ByteWriteChannel) : OutputStream() {
69 private val loop = object : BlockingAdapter(parent) {
70 override suspend fun loop() {
71 try {
72 while (true) {
73 val task = rendezvous(0)
Sergey Mashkovf0c1d992018-02-02 17:18:47 +030074 if (task === CloseToken) {
75 break
76 }
77 else if (task === FlushToken) {
78 channel.flush()
79 channel.closedCause?.let { throw it }
80 }
Sergey Mashkov6e61c232017-12-19 15:21:14 +030081 else if (task is ByteArray) channel.writeFully(task, offset, length)
82 }
83 } catch (t: Throwable) {
84 if (t !is CancellationException) {
85 channel.close(t)
86 }
Sergey Mashkovf0c1d992018-02-02 17:18:47 +030087 throw t
Sergey Mashkov6e61c232017-12-19 15:21:14 +030088 } finally {
Sergey Mashkovf0c1d992018-02-02 17:18:47 +030089 if (!channel.close()) {
90 channel.closedCause?.let { throw it }
91 }
Sergey Mashkov6e61c232017-12-19 15:21:14 +030092 }
93 }
94 }
95
96 private var single: ByteArray? = null
97
98 @Synchronized
99 override fun write(b: Int) {
100 val buffer = single ?: ByteArray(1).also { single = it }
101 buffer[0] = b.toByte()
102 loop.submitAndAwait(buffer, 0, 1)
103 }
104
105 @Synchronized
106 override fun write(b: ByteArray?, off: Int, len: Int) {
107 loop.submitAndAwait(b!!, off, len)
108 }
109
110 @Synchronized
111 override fun flush() {
112 loop.submitAndAwait(FlushToken)
113 }
114
115 @Synchronized
116 override fun close() {
Sergey Mashkovf0c1d992018-02-02 17:18:47 +0300117 try {
118 loop.submitAndAwait(CloseToken)
119 loop.shutdown()
120 } catch (t: Throwable) {
121 throw IOException(t)
122 }
Sergey Mashkov6e61c232017-12-19 15:21:14 +0300123 }
124}
125
126private abstract class BlockingAdapter(val parent: Job? = null) {
127 private val end: Continuation<Unit> = object : Continuation<Unit> {
128 override val context: CoroutineContext
129 get() = if (parent != null) Unconfined + parent else EmptyCoroutineContext
130
131 override fun resume(value: Unit) {
132 var thread: Thread? = null
133 result.value = -1
134 state.update { current ->
135 when (current) {
136 is Thread -> {
137 thread = current
138 Unit
139 }
140 this -> Unit
141 else -> return
142 }
143 }
144
145 thread?.let { LockSupport.unpark(it) }
146 disposable?.dispose()
147 }
148
149 override fun resumeWithException(exception: Throwable) {
150 var thread: Thread? = null
151 var continuation: Continuation<*>? = null
152
153 result.value = -1
154 state.update { current ->
155 when (current) {
156 is Thread -> {
157 thread = current
158 exception
159 }
160 is Continuation<*> -> {
161 continuation = current
162 exception
163 }
164 this -> exception
165 else -> return
166 }
167 }
168
169 thread?.let { LockSupport.unpark(it) }
170 continuation?.resumeWithException(exception)
171
172 if (exception !is CancellationException) {
173 parent?.cancel(exception)
174 }
175
176 disposable?.dispose()
177 }
178 }
179
180 @Suppress("LeakingThis")
181 private val state: AtomicRef<Any> = atomic(this) // could be a thread, a continuation, Unit, an exception or this if not yet started
182 private val result = atomic(0)
183 private val disposable: DisposableHandle? = parent?.invokeOnCompletion { cause ->
184 if (cause != null) {
185 end.resumeWithException(cause)
186 }
187 }
188
189 protected var offset: Int = 0
190 private set
191 protected var length: Int = 0
192 private set
193
194 init {
195 val block: suspend () -> Unit = { loop() }
196 block.startCoroutineUninterceptedOrReturn(end)
197 require(state.value !== this)
198 }
199
200 protected abstract suspend fun loop()
201
202 fun shutdown() {
203 disposable?.dispose()
204 end.resumeWithException(CancellationException("Stream closed"))
205 }
206
207 fun submitAndAwait(buffer: ByteArray, offset: Int, length: Int): Int {
208 this.offset = offset
209 this.length = length
210 return submitAndAwait(buffer)
211 }
212
213 fun submitAndAwait(jobToken: Any): Int {
214 val thread = Thread.currentThread()!!
215
216 var cont: Continuation<Any>? = null
217
218 state.update { value ->
219 when (value) {
220 is Continuation<*> -> {
221 @Suppress("UNCHECKED_CAST")
222 cont = value as Continuation<Any>
223 thread
224 }
225 is Unit -> {
226 return result.value
227 }
228 is Throwable -> {
229 throw value
230 }
231 is Thread -> throw IllegalStateException("There is already thread owning adapter")
232 this -> throw IllegalStateException("Not yet started")
233 else -> NoWhenBranchMatchedException()
234 }
235 }
236
237 cont!!.resume(jobToken)
238
239 while (state.value === thread) {
240 LockSupport.park()
241 }
242
Sergey Mashkovf0c1d992018-02-02 17:18:47 +0300243 state.value.let { state ->
244 if (state is Throwable) {
245 throw state
246 }
247 }
248
Sergey Mashkov6e61c232017-12-19 15:21:14 +0300249 return result.value
250 }
251
252 @Suppress("NOTHING_TO_INLINE")
253 protected suspend inline fun rendezvous(rc: Int): Any {
254 result.value = rc
255
256 return suspendCoroutineOrReturn { c ->
257 var thread: Thread? = null
258
259 state.update { value ->
260 when (value) {
261 is Thread -> {
262 thread = value
263 c
264 }
265 this -> c
266 else -> throw IllegalStateException("Already suspended or in finished state")
267 }
268 }
269
270 if (thread != null) {
271 LockSupport.unpark(thread)
272 }
273
274 COROUTINE_SUSPENDED
275 }
276 }
277}