blob: 7b8506c5ac2dee8f9e1121f90135cae50fcf21bb [file] [log] [blame]
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03001/*
2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
Sergey Mashkovb87d6362017-08-23 11:58:06 +03005package kotlinx.coroutines.experimental.io
6
Roman Elizarov9fe5f462018-02-21 19:05:52 +03007import kotlinx.coroutines.experimental.*
8import kotlinx.coroutines.experimental.channels.*
Sergey Mashkove6764082017-12-05 12:27:09 +03009import kotlinx.coroutines.experimental.io.internal.*
Sergey Mashkovc51ecd32017-10-27 22:42:39 +030010import org.junit.*
Sergey Mashkovdfae5e22017-12-18 19:31:27 +030011import org.junit.Test
Roman Elizarov9fe5f462018-02-21 19:05:52 +030012import java.io.*
13import kotlin.coroutines.experimental.*
Sergey Mashkovdfae5e22017-12-18 19:31:27 +030014import kotlin.test.*
Sergey Mashkovb87d6362017-08-23 11:58:06 +030015
16class ByteBufferChannelScenarioTest : TestBase() {
17 private val ch = ByteBufferChannel(true)
18
Sergey Mashkovc51ecd32017-10-27 22:42:39 +030019 @After
20 fun finish() {
21 ch.close(InterruptedException())
22 }
23
Sergey Mashkovb87d6362017-08-23 11:58:06 +030024 @Test
25 fun testReadBeforeAvailable() {
26 expect(1)
27
28 runBlocking {
29 launch(coroutineContext) {
30 expect(3)
31
32 val bb = ByteBuffer.allocate(10)
33 val rc = ch.readAvailable(bb) // should suspend
34
35 expect(5)
36 assertEquals(4, rc)
37
38 expect(6)
39 }
40
41 expect(2)
42 yield()
43
44 expect(4)
45 ch.writeInt(0xff) // should resume
46
47 yield()
48
49 finish(7)
50 }
51 }
52
53 @Test
54 fun testReadBeforeAvailable2() {
55 expect(1)
56
57 runBlocking {
58 launch(coroutineContext) {
59 expect(3)
60
61 val bb = ByteBuffer.allocate(4)
62 ch.readFully(bb) // should suspend
63
64 expect(5)
65
66 bb.flip()
67 assertEquals(4, bb.remaining())
68
69 expect(6)
70 }
71
72 expect(2)
73 yield()
74
75 expect(4)
76 ch.writeInt(0xff) // should resume
77
78 yield()
79
80 finish(7)
81 }
82 }
83
84 @Test
85 fun testReadAfterAvailable() {
86 expect(1)
87
88 runBlocking {
89 ch.writeInt(0xff) // should resume
90
91 launch(coroutineContext) {
92 expect(3)
93
94 val bb = ByteBuffer.allocate(10)
95 val rc = ch.readAvailable(bb) // should NOT suspend
96
97 expect(4)
98 assertEquals(4, rc)
99
100 expect(5)
101 }
102
103 expect(2)
104 yield()
105
106 finish(6)
107 }
108 }
109
110 @Test
111 fun testReadAfterAvailable2() {
112 expect(1)
113
114 runBlocking {
115 ch.writeInt(0xff) // should resume
116
117 launch(coroutineContext) {
118 expect(3)
119
120 val bb = ByteBuffer.allocate(4)
121 ch.readFully(bb) // should NOT suspend
122
123 expect(4)
124 bb.flip()
125 assertEquals(4, bb.remaining())
126
127 expect(5)
128 }
129
130 expect(2)
131 yield()
132
133 finish(6)
134 }
135 }
136
137 @Test
138 fun testReadToEmpty() {
139 runBlocking {
140 expect(1)
141
142 val rc = ch.readAvailable(ByteBuffer.allocate(0))
143
144 expect(2)
145
146 assertEquals(0, rc)
147
148 finish(3)
149 }
150 }
151
152 @Test
153 fun testReadToEmptyFromFailedChannel() {
154 runBlocking {
155 expect(1)
156
157 ch.close(IOException())
158
159 try {
160 ch.readAvailable(ByteBuffer.allocate(0))
161 fail("Should throw exception")
162 } catch (expected: IOException) {
163 }
164
165 finish(2)
166 }
167 }
168
169 @Test
170 fun testReadToEmptyFromClosedChannel() {
171 runBlocking {
172 expect(1)
173
174 ch.close()
175
176 val rc = ch.readAvailable(ByteBuffer.allocate(0))
177
178 expect(2)
179
180 assertEquals(-1, rc)
181
182 finish(3)
183 }
184 }
185
186 @Test
187 fun testReadFullyToEmptyFromClosedChannel() {
188 runBlocking {
189 expect(1)
190
191 ch.close()
192
193 ch.readFully(ByteBuffer.allocate(0))
194
195 finish(2)
196 }
197 }
198
199 @Test
200 fun testReadFullyFromClosedChannel() {
201 runBlocking {
202 expect(1)
203
204 ch.close()
205 try {
206 ch.readFully(ByteBuffer.allocate(1))
207 fail("Should throw exception")
208 } catch (expected: ClosedReceiveChannelException) {
209 }
210
211 finish(2)
212 }
213 }
214
215 @Test
216 fun testReadFullyToEmptyFromFailedChannel() {
217 runBlocking {
218 expect(1)
219
220 ch.close(IOException())
221
222 try {
223 ch.readFully(ByteBuffer.allocate(0))
224 fail("Should throw exception")
225 } catch (expected: IOException) {
226 }
227
228 finish(2)
229 }
230 }
Sergey Mashkov75101172017-09-25 19:41:08 +0300231
232 @Test
233 fun testWriteBlock() {
234 runBlocking {
235 launch(coroutineContext) {
236 expect(1)
237
238 ch.write {
239 it.putLong(0x1234567812345678L)
240 }
241
242 expect(2)
243 }
244
245 yield()
246 expect(3)
247
248 assertEquals(0x1234567812345678L, ch.readLong())
249 assertEquals(0, ch.availableForRead)
250
251 finish(4)
252 }
253 }
254
255 @Test
256 fun testWriteBlockSuspend() {
257 runBlocking {
258 launch(coroutineContext) {
259 expect(1)
260
261 ch.writeFully(ByteArray(4088))
262
263 expect(2)
264
265 ch.write(8) {
266 it.putLong(0x1234567812345678L)
267 }
268
269 expect(4)
270 }
271
272 yield()
273 expect(3)
274
275 ch.readFully(ByteArray(9))
276 yield()
277 expect(5)
278
279 ch.readFully(ByteArray(4088 - 9))
280
281 expect(6)
282
283 assertEquals(0x1234567812345678L, ch.readLong())
284 assertEquals(0, ch.availableForRead)
285
286 finish(7)
287 }
288 }
289
290 @Test
291 fun testReadBlock() = runBlocking {
292 ch.writeLong(0x1234567812345678L)
293
294 ch.read {
295 assertEquals(0x1234567812345678L, it.getLong())
296 }
297
298 finish(1)
299 }
300
301 @Test
302 fun testReadBlockSuspend() = runBlocking {
303 ch.writeByte(0x12)
304
305 launch(coroutineContext) {
306 expect(1)
307 ch.read(8) {
308 assertEquals(0x1234567812345678L, it.getLong())
309 }
310
311 expect(3)
312 }
313
314 yield()
315 expect(2)
316
317 ch.writeLong(0x3456781234567800L)
318 yield()
319
320 expect(4)
321 ch.readByte()
322 assertEquals(0, ch.availableForRead)
323
324 finish(5)
325 }
326
327 @Test
328 fun testReadBlockSuspend2() = runBlocking {
329 launch(coroutineContext) {
330 expect(1)
331 ch.read(8) {
332 assertEquals(0x1234567812345678L, it.getLong())
333 }
334
335 expect(3)
336 }
337
338 yield()
339 expect(2)
340
341 ch.writeLong(0x1234567812345678L)
342 yield()
343
344 expect(4)
345 assertEquals(0, ch.availableForRead)
346
347 finish(5)
348 }
Sergey Mashkovbab9f792017-11-07 19:42:15 +0300349
350 @Test
351 fun testWriteByteSuspend() = runBlocking {
352 launch(coroutineContext) {
353 expect(1)
354 ch.writeByte(1)
355 ch.writeFully(ByteArray(ch.availableForWrite))
356 expect(2)
357 ch.writeByte(1)
358 expect(5)
359 ch.close()
360 }
361
362 yield()
363 expect(3)
364 yield()
365 expect(4)
366 yield()
367
368 ch.readByte()
369 yield()
370
371 ch.readRemaining()
372 finish(6)
373 }
374
375 @Test
376 fun testWriteShortSuspend() = runBlocking {
377 launch(coroutineContext) {
378 expect(1)
379 ch.writeByte(1)
380 ch.writeFully(ByteArray(ch.availableForWrite))
381 expect(2)
382 ch.writeShort(1)
383 expect(5)
384 ch.close()
385 }
386
387 yield()
388 expect(3)
389 yield()
390 expect(4)
391 yield()
392
393 ch.readShort()
394 yield()
395
396 ch.readRemaining()
397 finish(6)
398 }
399
400 @Test
401 fun testWriteIntSuspend() = runBlocking {
402 launch(coroutineContext) {
403 expect(1)
404 ch.writeByte(1)
405 ch.writeFully(ByteArray(ch.availableForWrite))
406 expect(2)
407 ch.writeInt(1)
408 expect(5)
409 ch.close()
410 }
411
412 yield()
413 expect(3)
414 yield()
415 expect(4)
416 yield()
417
418 ch.readInt()
419 yield()
420
421 ch.readRemaining()
422 finish(6)
423 }
424
425 @Test
Sergey Mashkove6764082017-12-05 12:27:09 +0300426 fun testWriteIntThenRead() = runBlocking {
427 val size = BUFFER_SIZE - RESERVED_SIZE - 3
428
429 expect(1)
430 ch.writeFully(java.nio.ByteBuffer.allocate(size))
431 ch.flush()
432 expect(2)
433
434 launch(coroutineContext) {
435 expect(4)
436 ch.readPacket(size).release()
437 }
438
439 // coroutine is pending
440 expect(3)
441 ch.writeInt(0x11223344)
442 expect(5)
443
444 assertEquals(0x11223344, ch.readInt())
445
446 finish(6)
447 }
448
449 @Test
Sergey Mashkovbab9f792017-11-07 19:42:15 +0300450 fun testWriteLongSuspend() = runBlocking {
451 launch(coroutineContext) {
452 expect(1)
453 ch.writeByte(1)
454 ch.writeFully(ByteArray(ch.availableForWrite))
455 expect(2)
456 ch.writeLong(1)
457 expect(5)
458 ch.close()
459 }
460
461 yield()
462 expect(3)
463 yield()
464 expect(4)
465 yield()
466
467 ch.readLong()
468 yield()
469
470 ch.readRemaining()
471 finish(6)
472 }
473
Sergey Mashkov4df720f2017-12-05 16:45:55 +0300474 @Test
475 fun testDiscardExisting() = runBlocking {
476 launch(coroutineContext) {
477 expect(1)
478 ch.writeInt(1)
479 ch.writeInt(2)
480 expect(2)
481 }
482
483 yield()
484 expect(3)
485
486 assertEquals(4, ch.discard(4))
487 assertEquals(2, ch.readInt())
488
489 finish(4)
490 }
491
492 @Test
493 fun testDiscardPartiallyExisting() = runBlocking {
494 ch.writeInt(1)
495
496 launch(coroutineContext) {
497 expect(1)
498 assertEquals(8, ch.discard(8))
499 expect(3)
500 }
501
502 yield()
503 expect(2)
504
505 ch.writeInt(2)
506 yield()
507
508 expect(4)
509 assertEquals(0, ch.availableForRead)
510 finish(5)
511 }
512
513 @Test
514 fun testDiscardPartiallyExisting2() = runBlocking {
515 launch(coroutineContext) {
516 expect(1)
517 assertEquals(8, ch.discard(8))
518 expect(4)
519 }
520
521 yield()
522
523 expect(2)
524 ch.writeInt(1)
525 yield()
526 expect(3)
527 assertEquals(0, ch.availableForRead)
528
529 ch.writeInt(2)
530 yield()
531 expect(5)
532 assertEquals(0, ch.availableForRead)
533 finish(6)
534 }
535
536 @Test
537 fun testDiscardClose() = runBlocking {
538 launch(coroutineContext) {
539 expect(1)
540 assertEquals(8, ch.discard())
541 expect(4)
542 }
543
544 yield()
545
546 expect(2)
547 ch.writeInt(1)
548 yield()
549 ch.writeInt(2)
550 yield()
551
552 expect(3)
553 ch.close()
554 yield()
555
556 finish(5)
557 }
Sergey Mashkovdfae5e22017-12-18 19:31:27 +0300558
559 @Test
560 fun testWriteWhile() = runBlocking {
561 val size = 16384
562
563 launch(coroutineContext) {
564 expect(1)
565 var b: Byte = 0
566 var count = 0
567
568 ch.writeWhile { buffer ->
569 while (buffer.hasRemaining() && count < size) {
570 buffer.put(b++)
571 count++
572 }
573 count < size
574 }
575 expect(3)
576 ch.close()
577 }
578
579 yield()
580
581 expect(2)
582
583 val buffer = ByteArray(size)
584 ch.readFully(buffer)
585
586 var expectedB: Byte = 0
587 for (i in buffer.indices) {
588 assertEquals(expectedB, buffer[i])
589 expectedB++
590 }
591
592 yield()
593 yield()
594
595 finish(4)
596 assertTrue(ch.isClosedForRead)
597 }
Sergey Mashkovb87d6362017-08-23 11:58:06 +0300598}