blob: 8feba42fb280db82a01485b3bc8ca31c7e8793dd [file] [log] [blame]
Sergey Mashkove86eb082017-12-04 18:45:05 +03001package kotlinx.coroutines.experimental.io
2
3import kotlinx.coroutines.experimental.*
4import kotlinx.coroutines.experimental.io.jvm.javaio.*
5import kotlinx.coroutines.experimental.io.jvm.nio.*
6import org.junit.Test
7import java.io.*
8import java.nio.channels.*
Sergey Mashkov6e61c232017-12-19 15:21:14 +03009import java.util.*
Sergey Mashkove86eb082017-12-04 18:45:05 +030010import kotlin.test.*
11
Roman Elizarovc45f5fa2018-01-11 13:48:19 +030012class JavaIOTest : TestBase() {
Sergey Mashkove86eb082017-12-04 18:45:05 +030013 private val channel = ByteChannel()
14
15 @Test
16 fun writeStream() = runBlocking {
17 channel.writeStringUtf8("OK")
18 channel.close()
19
20 val baos = ByteArrayOutputStream()
21 channel.copyTo(baos)
22
23 assertEquals("OK", baos.toByteArray().toString(Charsets.ISO_8859_1))
24 }
25
26 @Test
27 fun testReadStream() = runBlocking {
28 val stream = ByteArrayInputStream("OK".toByteArray())
29 stream.copyTo(channel)
30 channel.close()
31
32 assertEquals("OK", channel.readUTF8Line())
33 }
34
35 @Test
36 fun testNIOWriteChannel() = runBlocking {
37 val baos = ByteArrayOutputStream()
38 val nioChannel = Channels.newChannel(baos)
39
40 channel.writeStringUtf8("OK")
41 channel.close()
42 channel.copyTo(nioChannel)
43
44 assertEquals("OK", baos.toByteArray().toString(Charsets.ISO_8859_1))
45 }
46
47 @Test
48 fun testNIOReadChannel() = runBlocking {
49 val nioChannel = Channels.newChannel(ByteArrayInputStream("OK".toByteArray()))
50
51 nioChannel.copyTo(channel)
52 channel.close()
53
54 assertEquals("OK", channel.readUTF8Line())
55 }
56
57 @Test
58 fun writeStreamLimit() = runBlocking {
59 channel.writeStringUtf8("OK")
60 channel.close()
61
62 val baos = ByteArrayOutputStream()
63 channel.copyTo(baos, limit = 1)
64
65 assertEquals("O", baos.toByteArray().toString(Charsets.ISO_8859_1))
66 }
67
68 @Test
69 fun testReadStreamLimit() = runBlocking {
70 val stream = ByteArrayInputStream("OK".toByteArray())
71 stream.copyTo(channel, limit = 1)
72 channel.close()
73
74 assertEquals("O", channel.readUTF8Line())
75 }
76
77 @Test
78 fun testNIOWriteChannelLimit() = runBlocking {
79 val baos = ByteArrayOutputStream()
80 val nioChannel = Channels.newChannel(baos)
81
82 channel.writeStringUtf8("OK")
83 channel.close()
84 channel.copyTo(nioChannel, limit = 1)
85
86 assertEquals("O", baos.toByteArray().toString(Charsets.ISO_8859_1))
87 }
88
89 @Test
90 fun testNIOReadChannelLimit() = runBlocking {
91 val nioChannel = Channels.newChannel(ByteArrayInputStream("OK".toByteArray()))
92
93 nioChannel.copyTo(channel, limit = 1)
94 channel.close()
95
96 assertEquals("O", channel.readUTF8Line())
97 }
98
99 @Test
100 fun testPiped() = runBlocking {
101 val pipe = Pipe.open()
Sergey Mashkov4adac452017-12-05 15:41:42 +0300102 val exec = newFixedThreadPoolContext(2, "blocking-io")
Sergey Mashkove86eb082017-12-04 18:45:05 +0300103
Sergey Mashkov4adac452017-12-05 15:41:42 +0300104 val channel1 = ByteChannel(autoFlush = false)
105 val channel2 = ByteChannel(autoFlush = false)
Sergey Mashkove86eb082017-12-04 18:45:05 +0300106
Sergey Mashkov4adac452017-12-05 15:41:42 +0300107 val j1 = launch(exec) {
Sergey Mashkove86eb082017-12-04 18:45:05 +0300108 try {
109 channel1.copyTo(pipe)
110 } finally {
111 pipe.sink().close()
112 }
113 }
114
Sergey Mashkov4adac452017-12-05 15:41:42 +0300115 j1.invokeOnCompletion {
116 it?.let { println("j1 failed with $it"); it.printStackTrace() }
117 }
118
119 val j2 = launch(exec) {
Sergey Mashkove86eb082017-12-04 18:45:05 +0300120 pipe.copyTo(channel2)
121 channel2.close()
122 }
123
Sergey Mashkov4adac452017-12-05 15:41:42 +0300124 j2.invokeOnCompletion {
125 it?.let { println("j2 failed with $it"); it.printStackTrace() }
126 }
127
128 channel1.writeStringUtf8("OK\n")
Sergey Mashkove86eb082017-12-04 18:45:05 +0300129 channel1.close()
130
Sergey Mashkov4adac452017-12-05 15:41:42 +0300131 try {
132 assertEquals("OK", channel2.readUTF8Line())
133 } catch (t: Throwable) {
134 t.printStackTrace()
135 j1.cancel()
136 j2.cancel()
137 channel1.close(t)
138 channel2.close(t)
139 throw t
140 }
141
142 j1.join()
143 j2.join()
144
145
146 exec.close()
Sergey Mashkove86eb082017-12-04 18:45:05 +0300147 }
148
149 @Test
150 fun testPipedALot() = runBlocking {
Sergey Mashkov4adac452017-12-05 15:41:42 +0300151 val exec = newFixedThreadPoolContext(2, "blocking-io")
Sergey Mashkove86eb082017-12-04 18:45:05 +0300152 val numberOfLines = 10000
153 val pipe = Pipe.open()
154
155 val channel1 = ByteChannel()
156 val channel2 = ByteChannel()
157
Sergey Mashkov4adac452017-12-05 15:41:42 +0300158 launch(exec, parent = coroutineContext[Job]!!) {
Sergey Mashkove86eb082017-12-04 18:45:05 +0300159 try {
160 channel1.copyTo(pipe)
161 } finally {
162 pipe.sink().close()
163 }
164 }
165
Sergey Mashkov4adac452017-12-05 15:41:42 +0300166 launch(exec, parent = coroutineContext[Job]!!) {
Sergey Mashkove86eb082017-12-04 18:45:05 +0300167 pipe.copyTo(channel2)
168 channel2.close()
169 }
170
171 launch(coroutineContext) {
172 for (i in 1..numberOfLines) {
173 channel1.writeStringUtf8("OK $i\n")
174 }
175 channel1.close()
176 }
177
178 for (i in 1..numberOfLines) {
179 assertEquals("OK $i", channel2.readUTF8Line())
180 }
Roman Elizarovc45f5fa2018-01-11 13:48:19 +0300181
182 exec.close()
Sergey Mashkove86eb082017-12-04 18:45:05 +0300183 }
184
185 @Test
186 fun testPipedLimited() = runBlocking {
Sergey Mashkov4adac452017-12-05 15:41:42 +0300187 val exec = newFixedThreadPoolContext(2, "blocking-io")
Sergey Mashkove86eb082017-12-04 18:45:05 +0300188 val pipe = Pipe.open()
189
190 val channel1 = ByteChannel()
191 val channel2 = ByteChannel()
192
Sergey Mashkov4adac452017-12-05 15:41:42 +0300193 launch(exec, parent = coroutineContext[Job]!!) {
Sergey Mashkove86eb082017-12-04 18:45:05 +0300194 channel1.copyTo(pipe, limit = 1)
195 }
196
Sergey Mashkov4adac452017-12-05 15:41:42 +0300197 launch(exec, parent = coroutineContext[Job]!!) {
Sergey Mashkove86eb082017-12-04 18:45:05 +0300198 pipe.copyTo(channel2, limit = 1)
199 channel2.close()
200 }
201
202 channel1.writeStringUtf8("OK")
203 channel1.close()
204
205 assertEquals("O", channel2.readUTF8Line())
206
207 pipe.source().close()
208 pipe.sink().close()
Roman Elizarovc45f5fa2018-01-11 13:48:19 +0300209 exec.close()
Sergey Mashkove86eb082017-12-04 18:45:05 +0300210 }
Sergey Mashkov6e61c232017-12-19 15:21:14 +0300211
212 @Test
213 fun testInputAdapter() {
214 newFixedThreadPoolContext(2, "blocking-io").use { exec ->
215 val input = channel.toInputStream()
216 val data = ByteArray(100)
217 Random().nextBytes(data)
218 launch(exec) {
219 channel.writeFully(data)
220 channel.close()
221 }
222
223 val result = ByteArray(100)
224 assertEquals(100, input.read(result))
225 assertEquals(-1, input.read(result))
226
227 assertTrue(result.contentEquals(data))
228 }
229 }
230
231 @Test
232 fun testInputAdapter2() {
233 newFixedThreadPoolContext(2, "blocking-io").use { exec ->
234 val count = 100
235 val data = ByteArray(4096)
236 Random().nextBytes(data)
237
238 repeat(10000) {
239 val channel = ByteChannel(false)
240 launch(exec) {
241 for (i in 1..count) {
242 channel.writeFully(data)
243 }
244 channel.close()
245 }
246
247 val result = channel.toInputStream().readBytes()
248 assertEquals(4096 * count, result.size)
249 }
250 }
251 }
252
253 @Test
254 fun testOutputAdapter() {
255 newFixedThreadPoolContext(2, "blocking-io").use { exec ->
256 val output = channel.toOutputStream()
257 val data = ByteArray(100)
258 Random().nextBytes(data)
259
260 val j = launch(exec) {
261 val result = ByteArray(100)
262 assertEquals(100, channel.readAvailable(result))
263 assertEquals(-1, channel.readAvailable(result))
264 assertTrue(result.contentEquals(data))
265 }
266
267 output.write(data)
268 output.flush()
269 output.close()
270
271 runBlocking {
272 j.join()
273 }
274 j.invokeOnCompletion { cause ->
275 if (cause != null) throw cause
276 }
277 }
278 }
279}