blob: b074a4a1bef6f77471dc9e37861aaa598f00c4df [file] [log] [blame]
Sergey Mashkov86f70612017-07-25 10:59:44 +03001package kotlinx.coroutines.experimental.io
2
Sergey Mashkov32c90b12017-11-07 19:18:16 +03003import kotlinx.coroutines.experimental.*
Roman Elizarov96d7a882017-08-30 15:17:05 +03004import kotlinx.coroutines.experimental.channels.ClosedReceiveChannelException
Roman Elizarov75675e62017-11-30 15:14:44 +03005import kotlinx.coroutines.experimental.io.internal.BUFFER_SIZE
6import kotlinx.coroutines.experimental.io.internal.RESERVED_SIZE
7import kotlinx.coroutines.experimental.io.internal.ReadWriteBufferState
Sergey Mashkov50a08572017-10-09 12:56:07 +03008import kotlinx.coroutines.experimental.io.packet.ByteReadPacket
Roman Elizarov75675e62017-11-30 15:14:44 +03009import kotlinx.coroutines.experimental.io.packet.ByteWritePacket
10import kotlinx.io.core.BufferView
11import kotlinx.io.core.BytePacketBuilder
12import kotlinx.io.core.readUTF8Line
13import kotlinx.io.pool.DefaultPool
14import kotlinx.io.pool.NoPoolImpl
15import org.junit.After
16import org.junit.Rule
17import org.junit.Test
Roman Elizarov96d7a882017-08-30 15:17:05 +030018import org.junit.rules.ErrorCollector
19import org.junit.rules.Timeout
Roman Elizarov96d7a882017-08-30 15:17:05 +030020import java.nio.CharBuffer
Sergey Mashkov86f70612017-07-25 10:59:44 +030021import java.util.*
Roman Elizarov75675e62017-11-30 15:14:44 +030022import java.util.concurrent.CountDownLatch
23import java.util.concurrent.TimeUnit
24import java.util.concurrent.atomic.AtomicInteger
25import kotlin.test.assertEquals
26import kotlin.test.assertNotEquals
27import kotlin.test.assertTrue
28import kotlin.test.fail
Sergey Mashkov86f70612017-07-25 10:59:44 +030029
Roman Elizarov6129c942018-01-10 17:34:40 +030030class ByteBufferChannelTest : TestBase() {
Sergey Mashkov86f70612017-07-25 10:59:44 +030031 @get:Rule
Roman Elizarove084cf72018-01-11 10:09:58 +030032 val timeout = Timeout(100L * stressTestMultiplier, TimeUnit.SECONDS)
Sergey Mashkov86f70612017-07-25 10:59:44 +030033
34 @get:Rule
Sergey Mashkov57906eb2017-09-19 14:08:06 +030035 private val failures = ErrorCollector()
36
37 @get:Rule
Sergey Mashkov32c90b12017-11-07 19:18:16 +030038 internal val pool = VerifyingObjectPool(object : NoPoolImpl<ReadWriteBufferState.Initial>() {
39 override fun borrow(): ReadWriteBufferState.Initial {
40 return ReadWriteBufferState.Initial(java.nio.ByteBuffer.allocate(4096))
41 }
42 })
Sergey Mashkov57906eb2017-09-19 14:08:06 +030043
44 @get:Rule
Sergey Mashkovccf8dde2017-09-29 18:18:00 +030045 internal val pktPool = VerifyingObjectPool(BufferView.Pool)
Sergey Mashkov86f70612017-07-25 10:59:44 +030046
Roman Elizarov83762032017-08-08 19:58:40 +030047 private val Size = BUFFER_SIZE - RESERVED_SIZE
Sergey Mashkov57906eb2017-09-19 14:08:06 +030048 private val ch = ByteBufferChannel(autoFlush = false, pool = pool)
Sergey Mashkov86f70612017-07-25 10:59:44 +030049
Sergey Mashkovc51ecd32017-10-27 22:42:39 +030050 @After
51 fun finish() {
52 ch.close(InterruptedException())
53 }
54
Sergey Mashkov86f70612017-07-25 10:59:44 +030055 @Test
56 fun testBoolean() {
57 runBlocking {
58 ch.writeBoolean(true)
59 ch.flush()
60 assertEquals(true, ch.readBoolean())
61
62 ch.writeBoolean(false)
63 ch.flush()
64 assertEquals(false, ch.readBoolean())
65 }
66 }
67
68 @Test
69 fun testByte() {
70 runBlocking {
Roman Elizarov469cad32017-08-15 15:54:56 +030071 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +030072 ch.writeByte(-1)
73 ch.flush()
Roman Elizarov469cad32017-08-15 15:54:56 +030074 assertEquals(1, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +030075 assertEquals(-1, ch.readByte())
Roman Elizarov469cad32017-08-15 15:54:56 +030076 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +030077 }
78 }
79
80 @Test
81 fun testShortB() {
82 runBlocking {
83 ch.readByteOrder = ByteOrder.BIG_ENDIAN
84 ch.writeByteOrder = ByteOrder.BIG_ENDIAN
85
Roman Elizarov469cad32017-08-15 15:54:56 +030086 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +030087 ch.writeShort(-1)
Roman Elizarov469cad32017-08-15 15:54:56 +030088 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +030089 ch.flush()
Roman Elizarov469cad32017-08-15 15:54:56 +030090 assertEquals(2, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +030091 assertEquals(-1, ch.readShort())
Roman Elizarov469cad32017-08-15 15:54:56 +030092 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +030093 }
94 }
95
96 @Test
97 fun testShortL() {
98 runBlocking {
99 ch.readByteOrder = ByteOrder.LITTLE_ENDIAN
100 ch.writeByteOrder = ByteOrder.LITTLE_ENDIAN
101
Roman Elizarov469cad32017-08-15 15:54:56 +0300102 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300103 ch.writeShort(-1)
Roman Elizarov469cad32017-08-15 15:54:56 +0300104 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300105 ch.flush()
Roman Elizarov469cad32017-08-15 15:54:56 +0300106 assertEquals(2, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300107 assertEquals(-1, ch.readShort())
Roman Elizarov469cad32017-08-15 15:54:56 +0300108 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300109 }
110 }
111
112 @Test
113 fun testShortEdge() {
114 runBlocking {
115 ch.writeByte(1)
116
117 for (i in 2 until Size step 2) {
118 ch.writeShort(0x00ee)
119 }
120
121 ch.flush()
122
123 ch.readByte()
124 ch.writeShort(0x1234)
125
126 ch.flush()
127
Roman Elizarov469cad32017-08-15 15:54:56 +0300128 while (ch.availableForRead > 2) {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300129 ch.readShort()
130 }
131
132 assertEquals(0x1234, ch.readShort())
133 }
134 }
135
136 @Test
137 fun testIntB() {
138 runBlocking {
139 ch.readByteOrder = ByteOrder.BIG_ENDIAN
140 ch.writeByteOrder = ByteOrder.BIG_ENDIAN
141
Roman Elizarov469cad32017-08-15 15:54:56 +0300142 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300143 ch.writeInt(-1)
144 ch.flush()
Roman Elizarov469cad32017-08-15 15:54:56 +0300145 assertEquals(4, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300146 assertEquals(-1, ch.readInt())
Roman Elizarov469cad32017-08-15 15:54:56 +0300147 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300148 }
149 }
150
151 @Test
152 fun testIntL() {
153 runBlocking {
154 ch.readByteOrder = ByteOrder.LITTLE_ENDIAN
155 ch.writeByteOrder = ByteOrder.LITTLE_ENDIAN
156
Roman Elizarov469cad32017-08-15 15:54:56 +0300157 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300158 ch.writeInt(-1)
159 ch.flush()
Roman Elizarov469cad32017-08-15 15:54:56 +0300160 assertEquals(4, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300161 assertEquals(-1, ch.readInt())
Roman Elizarov469cad32017-08-15 15:54:56 +0300162 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300163 }
164 }
165
166 @Test
167 fun testIntEdge() {
168 runBlocking {
169 for (shift in 1..3) {
170 for (i in 1..shift) {
171 ch.writeByte(1)
172 }
173
174 repeat(Size / 4 - 1) {
175 ch.writeInt(0xeeeeeeeeL)
176 }
177
178 ch.flush()
179
180 for (i in 1..shift) {
181 ch.readByte()
182 }
183
184 ch.writeInt(0x12345678)
185
186 ch.flush()
187
Roman Elizarov469cad32017-08-15 15:54:56 +0300188 while (ch.availableForRead > 4) {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300189 ch.readInt()
190 }
191
192 assertEquals(0x12345678, ch.readInt())
193 }
194 }
195 }
196
197 @Test
198 fun testLongB() {
199 runBlocking {
200 ch.readByteOrder = ByteOrder.BIG_ENDIAN
201 ch.writeByteOrder = ByteOrder.BIG_ENDIAN
202
Roman Elizarov469cad32017-08-15 15:54:56 +0300203 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300204 ch.writeLong(Long.MIN_VALUE)
205 ch.flush()
Roman Elizarov469cad32017-08-15 15:54:56 +0300206 assertEquals(8, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300207 assertEquals(Long.MIN_VALUE, ch.readLong())
Roman Elizarov469cad32017-08-15 15:54:56 +0300208 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300209 }
210 }
211
212 @Test
213 fun testLongL() {
214 runBlocking {
215 ch.readByteOrder = ByteOrder.LITTLE_ENDIAN
216 ch.writeByteOrder = ByteOrder.LITTLE_ENDIAN
217
Roman Elizarov469cad32017-08-15 15:54:56 +0300218 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300219 ch.writeLong(Long.MIN_VALUE)
220 ch.flush()
Roman Elizarov469cad32017-08-15 15:54:56 +0300221 assertEquals(8, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300222 assertEquals(Long.MIN_VALUE, ch.readLong())
Roman Elizarov469cad32017-08-15 15:54:56 +0300223 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300224 }
225 }
226
227 @Test
228 fun testLongEdge() {
229 runBlocking {
230 for (shift in 1..7) {
231 for (i in 1..shift) {
232 ch.writeByte(1)
233 }
234
235 repeat(Size / 8 - 1) {
236 ch.writeLong(0x11112222eeeeeeeeL)
237 }
238
239 ch.flush()
240 for (i in 1..shift) {
241 ch.readByte()
242 }
243
244 ch.writeLong(0x1234567812345678L)
245 ch.flush()
246
Roman Elizarov469cad32017-08-15 15:54:56 +0300247 while (ch.availableForRead > 8) {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300248 ch.readLong()
249 }
250
251 assertEquals(0x1234567812345678L, ch.readLong())
252 }
253 }
254 }
255
256 @Test
257 fun testDoubleB() {
258 runBlocking {
259 ch.readByteOrder = ByteOrder.BIG_ENDIAN
260 ch.writeByteOrder = ByteOrder.BIG_ENDIAN
261
Roman Elizarov469cad32017-08-15 15:54:56 +0300262 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300263 ch.writeDouble(1.05)
264 ch.flush()
265
Roman Elizarov469cad32017-08-15 15:54:56 +0300266 assertEquals(8, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300267 assertEquals(1.05, ch.readDouble())
Roman Elizarov469cad32017-08-15 15:54:56 +0300268 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300269 }
270 }
271
272 @Test
273 fun testDoubleL() {
274 runBlocking {
275 ch.readByteOrder = ByteOrder.LITTLE_ENDIAN
276 ch.writeByteOrder = ByteOrder.LITTLE_ENDIAN
277
Roman Elizarov469cad32017-08-15 15:54:56 +0300278 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300279 ch.writeDouble(1.05)
280 ch.flush()
281
Roman Elizarov469cad32017-08-15 15:54:56 +0300282 assertEquals(8, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300283 assertEquals(1.05, ch.readDouble())
Roman Elizarov469cad32017-08-15 15:54:56 +0300284 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300285 }
286 }
287
288 @Test
289 fun testFloatB() {
290 runBlocking {
291 ch.readByteOrder = ByteOrder.BIG_ENDIAN
292 ch.writeByteOrder = ByteOrder.BIG_ENDIAN
293
Roman Elizarov469cad32017-08-15 15:54:56 +0300294 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300295 ch.writeFloat(1.05f)
296 ch.flush()
297
Roman Elizarov469cad32017-08-15 15:54:56 +0300298 assertEquals(4, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300299 assertEquals(1.05f, ch.readFloat())
Roman Elizarov469cad32017-08-15 15:54:56 +0300300 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300301 }
302 }
303
304 @Test
305 fun testFloatL() {
306 runBlocking {
307 ch.readByteOrder = ByteOrder.LITTLE_ENDIAN
308 ch.writeByteOrder = ByteOrder.LITTLE_ENDIAN
309
Roman Elizarov469cad32017-08-15 15:54:56 +0300310 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300311 ch.writeFloat(1.05f)
312 ch.flush()
313
Roman Elizarov469cad32017-08-15 15:54:56 +0300314 assertEquals(4, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300315 assertEquals(1.05f, ch.readFloat())
Roman Elizarov469cad32017-08-15 15:54:56 +0300316 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300317 }
318 }
319
Roman Elizarovfccac1d2017-08-15 11:58:43 +0300320
321
Sergey Mashkov86f70612017-07-25 10:59:44 +0300322 @Test
323 fun testEndianMix() {
Roman Elizarovfccac1d2017-08-15 11:58:43 +0300324 val byteOrders = listOf(ByteOrder.BIG_ENDIAN, ByteOrder.LITTLE_ENDIAN)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300325 runBlocking {
Roman Elizarovfccac1d2017-08-15 11:58:43 +0300326 for (writeOrder in byteOrders) {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300327 ch.writeByteOrder = writeOrder
328
Roman Elizarovfccac1d2017-08-15 11:58:43 +0300329 for (readOrder in byteOrders) {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300330 ch.readByteOrder = readOrder
331
Roman Elizarov469cad32017-08-15 15:54:56 +0300332 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300333 ch.writeShort(0x001f)
334 ch.flush()
335 if (writeOrder == readOrder)
336 assertEquals(0x001f, ch.readShort())
337 else
338 assertEquals(0x1f00, ch.readShort())
339
Roman Elizarov469cad32017-08-15 15:54:56 +0300340 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300341 ch.writeShort(0x001f)
342 ch.flush()
343 if (writeOrder == readOrder)
344 assertEquals(0x001f, ch.readShort())
345 else
346 assertEquals(0x1f00, ch.readShort())
347
Roman Elizarov469cad32017-08-15 15:54:56 +0300348 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300349 ch.writeInt(0x1f)
350 ch.flush()
351 if (writeOrder == readOrder)
352 assertEquals(0x0000001f, ch.readInt())
353 else
354 assertEquals(0x1f000000, ch.readInt())
355
Roman Elizarov469cad32017-08-15 15:54:56 +0300356 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300357 ch.writeInt(0x1fL)
358 ch.flush()
359 if (writeOrder == readOrder)
360 assertEquals(0x0000001f, ch.readInt())
361 else
362 assertEquals(0x1f000000, ch.readInt())
363
Roman Elizarov469cad32017-08-15 15:54:56 +0300364 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300365 ch.writeLong(0x1f)
366 ch.flush()
367 if (writeOrder == readOrder)
368 assertEquals(0x1f, ch.readLong())
369 else
370 assertEquals(0x1f00000000000000L, ch.readLong())
371 }
372 }
373 }
374 }
375
376 @Test
377 fun testClose() {
378 runBlocking {
379 ch.writeByte(1)
380 ch.writeByte(2)
381 ch.writeByte(3)
382
383 ch.flush()
384 assertEquals(1, ch.readByte())
385 ch.close()
386
387 assertEquals(2, ch.readByte())
388 assertEquals(3, ch.readByte())
389
390 try {
391 ch.readByte()
392 fail()
393 } catch (expected: ClosedReceiveChannelException) {
394 }
395 }
396 }
397
398 @Test
399 fun testReadAndWriteFully() {
400 runBlocking {
401 val bytes = byteArrayOf(1, 2, 3, 4, 5)
402 val dst = ByteArray(5)
403
404 ch.writeFully(bytes)
405 ch.flush()
Roman Elizarov469cad32017-08-15 15:54:56 +0300406 assertEquals(5, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300407 ch.readFully(dst)
408 assertTrue { dst.contentEquals(bytes) }
409
410 ch.writeFully(bytes)
411 ch.flush()
412
413 val dst2 = ByteArray(4)
414 ch.readFully(dst2)
415
Roman Elizarov469cad32017-08-15 15:54:56 +0300416 assertEquals(1, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300417 assertEquals(5, ch.readByte())
418
419 ch.close()
420
421 try {
422 ch.readFully(dst)
423 fail("")
424 } catch (expected: ClosedReceiveChannelException) {
425 }
426 }
427 }
428
429 @Test
430 fun testReadAndWriteFullyByteBuffer() {
431 runBlocking {
432 val bytes = byteArrayOf(1, 2, 3, 4, 5)
433 val dst = ByteArray(5)
434
435 ch.writeFully(ByteBuffer.wrap(bytes))
436 ch.flush()
Roman Elizarov469cad32017-08-15 15:54:56 +0300437 assertEquals(5, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300438 ch.readFully(ByteBuffer.wrap(dst))
439 assertTrue { dst.contentEquals(bytes) }
440
441 ch.writeFully(ByteBuffer.wrap(bytes))
442 ch.flush()
443
444 val dst2 = ByteArray(4)
445 ch.readFully(ByteBuffer.wrap(dst2))
446
Roman Elizarov469cad32017-08-15 15:54:56 +0300447 assertEquals(1, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300448 assertEquals(5, ch.readByte())
449
450 ch.close()
451
452 try {
453 ch.readFully(ByteBuffer.wrap(dst))
454 fail("")
455 } catch (expected: ClosedReceiveChannelException) {
456 }
457 }
458 }
459
460 @Test
461 fun testReadAndWritePartially() {
462 runBlocking {
463 val bytes = byteArrayOf(1, 2, 3, 4, 5)
464
Roman Elizarov469cad32017-08-15 15:54:56 +0300465 assertEquals(5, ch.writeAvailable(bytes))
Sergey Mashkov86f70612017-07-25 10:59:44 +0300466 ch.flush()
Roman Elizarov469cad32017-08-15 15:54:56 +0300467 assertEquals(5, ch.readAvailable(ByteArray(100)))
Sergey Mashkov86f70612017-07-25 10:59:44 +0300468
469 repeat(Size / bytes.size) {
Roman Elizarov469cad32017-08-15 15:54:56 +0300470 assertNotEquals(0, ch.writeAvailable(bytes))
Sergey Mashkov86f70612017-07-25 10:59:44 +0300471 ch.flush()
472 }
473
Roman Elizarov469cad32017-08-15 15:54:56 +0300474 ch.readAvailable(ByteArray(ch.availableForRead - 1))
475 assertEquals(1, ch.readAvailable(ByteArray(100)))
Sergey Mashkov86f70612017-07-25 10:59:44 +0300476
477 ch.close()
478 }
479 }
480
481 @Test
482 fun testReadAndWritePartiallyByteBuffer() {
483 runBlocking {
484 val bytes = byteArrayOf(1, 2, 3, 4, 5)
485
Roman Elizarov469cad32017-08-15 15:54:56 +0300486 assertEquals(5, ch.writeAvailable(ByteBuffer.wrap(bytes)))
Sergey Mashkov86f70612017-07-25 10:59:44 +0300487 ch.flush()
Roman Elizarov469cad32017-08-15 15:54:56 +0300488 assertEquals(5, ch.readAvailable(ByteBuffer.allocate(100)))
Sergey Mashkov86f70612017-07-25 10:59:44 +0300489
490 repeat(Size / bytes.size) {
Roman Elizarov469cad32017-08-15 15:54:56 +0300491 assertNotEquals(0, ch.writeAvailable(ByteBuffer.wrap(bytes)))
Sergey Mashkov86f70612017-07-25 10:59:44 +0300492 ch.flush()
493 }
494
Roman Elizarov469cad32017-08-15 15:54:56 +0300495 ch.readAvailable(ByteArray(ch.availableForRead - 1))
496 assertEquals(1, ch.readAvailable(ByteBuffer.allocate(100)))
Sergey Mashkov86f70612017-07-25 10:59:44 +0300497
498 ch.close()
499 }
500 }
501
502
503 @Test
504 fun testReadAndWriteBig() {
505 val count = 200
506 val bytes = ByteArray(65536)
507 Random().nextBytes(bytes)
508
509 launch(CommonPool + CoroutineName("writer")) {
510 for (i in 1..count) {
511 ch.writeFully(bytes)
512 ch.flush()
513 }
514 }.invokeOnCompletion { t ->
515 if (t != null) {
516 failures.addError(t)
517 }
518 }
519
520 runBlocking(CoroutineName("reader")) {
521 val dst = ByteArray(bytes.size)
522 for (i in 1..count) {
523 ch.readFully(dst)
524 assertTrue { dst.contentEquals(bytes) }
525 dst.fill(0)
526 }
527 }
528 }
529
530 @Test
531 fun testReadAndWriteBigByteBuffer() {
532 val count = 200
533 val bytes = ByteArray(65536)
534 Random().nextBytes(bytes)
535
536 launch(CommonPool + CoroutineName("writer")) {
537 for (i in 1..count) {
538 ch.writeFully(ByteBuffer.wrap(bytes))
539 ch.flush()
540 }
541 }.invokeOnCompletion { t ->
542 if (t != null) {
543 failures.addError(t)
544 }
545 }
546
547 runBlocking(CoroutineName("reader")) {
548 val dst = ByteArray(bytes.size)
549 for (i in 1..count) {
550 ch.readFully(ByteBuffer.wrap(dst))
551 assertTrue { dst.contentEquals(bytes) }
552 dst.fill(0)
553 }
554 }
555 }
Sergey Mashkova10b6922017-08-03 13:41:54 +0300556
557 @Test
558 fun testPacket() = runBlocking {
559 val packet = buildPacket {
560 writeInt(0xffee)
Sergey Mashkovfbcba262017-08-23 12:26:18 +0300561 writeStringUtf8("Hello")
Sergey Mashkova10b6922017-08-03 13:41:54 +0300562 }
563
564 ch.writeInt(packet.remaining)
565 ch.writePacket(packet)
566
567 ch.flush()
568
569 val size = ch.readInt()
570 val readed = ch.readPacket(size)
571
572 assertEquals(0xffee, readed.readInt())
573 assertEquals("Hello", readed.readUTF8Line())
574 }
575
576 @Test
577 fun testBigPacket() = runBlocking {
578 launch(CommonPool + CoroutineName("writer")) {
579 val packet = buildPacket {
580 writeInt(0xffee)
Sergey Mashkovfbcba262017-08-23 12:26:18 +0300581 writeStringUtf8(".".repeat(8192))
Sergey Mashkova10b6922017-08-03 13:41:54 +0300582 }
583
584 ch.writeInt(packet.remaining)
585 ch.writePacket(packet)
586
587 ch.flush()
588 }
589
590 val size = ch.readInt()
591 val readed = ch.readPacket(size)
592
593 assertEquals(0xffee, readed.readInt())
594 assertEquals(".".repeat(8192), readed.readUTF8Line())
595 }
Sergey Mashkovfbcba262017-08-23 12:26:18 +0300596
597 @Test
598 fun testWriteString() = runBlocking {
599 ch.writeStringUtf8("abc")
600 ch.close()
601
602 assertEquals("abc", ch.readASCIILine())
603 }
604
605 @Test
606 fun testWriteCharSequence() = runBlocking {
607 ch.writeStringUtf8("abc" as CharSequence)
608 ch.close()
609
610 assertEquals("abc", ch.readASCIILine())
611 }
612
613 @Test
614 fun testWriteCharBuffer() = runBlocking {
615 val cb = CharBuffer.allocate(6)
616
617 for (i in 0 until cb.remaining()) {
618 cb.put(i, ' ')
619 }
620
621 cb.position(2)
622 cb.put(2, 'a')
623 cb.put(3, 'b')
624 cb.put(4, 'c')
625 cb.limit(5)
626
627 assertEquals("abc", cb.slice().toString())
628
629 ch.writeStringUtf8(cb)
630 ch.close()
631
632 assertEquals("abc", ch.readASCIILine())
633 }
Sergey Mashkov57906eb2017-09-19 14:08:06 +0300634
Sergey Mashkov32c90b12017-11-07 19:18:16 +0300635 @Test
Sergey Mashkov88951ac2017-10-27 22:36:08 +0300636 fun testCopyLarge() {
Roman Elizarov6129c942018-01-10 17:34:40 +0300637 val count = 100 * 256 * stressTestMultiplier // * 8192
Sergey Mashkov88951ac2017-10-27 22:36:08 +0300638
639 launch {
640 val bb = ByteBuffer.allocate(8192)
641 for (i in 0 until bb.capacity()) {
642 bb.put((i and 0xff).toByte())
643 }
644
645 for (i in 1..count) {
646 bb.clear()
647 val split = i and 0x1fff
648
649 bb.limit(split)
650 ch.writeFully(bb)
651 yield()
652 bb.limit(bb.capacity())
653 ch.writeFully(bb)
654 }
655
656 ch.close()
657 }
658
659 val dest = ByteBufferChannel(true, pool)
660
Sergey Mashkovd489d3f2017-11-08 07:15:01 +0300661 val joinerJob = launch {
Sergey Mashkovb86a0032017-11-08 16:39:03 +0300662 ch.copyAndClose(dest)
663 }
664
665 val reader = launch {
666 val bb = ByteBuffer.allocate(8192)
667
668 for (i in 1..count) {
669 bb.clear()
670 dest.readFully(bb)
671 bb.flip()
672
673 if (i and 0x1fff == 0) {
674 for (idx in 0 until bb.capacity()) {
675 assertEquals((idx and 0xff).toByte(), bb.get())
676 }
677 }
678 }
679
680 yield()
681 assertTrue(dest.isClosedForRead)
682 }
683
684 runBlocking {
685 reader.join()
686 joinerJob.join()
687 dest.close()
688 ch.close()
689 }
690 }
691
692 @Test
693 fun testJoinToLarge() {
Roman Elizarov6129c942018-01-10 17:34:40 +0300694 val count = 100 * 256 * stressTestMultiplier // * 8192
Sergey Mashkovb86a0032017-11-08 16:39:03 +0300695
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300696 val writerJob = launch {
Sergey Mashkovb86a0032017-11-08 16:39:03 +0300697 val bb = ByteBuffer.allocate(8192)
698 for (i in 0 until bb.capacity()) {
699 bb.put((i and 0xff).toByte())
700 }
701
702 for (i in 1..count) {
703 bb.clear()
704 val split = i and 0x1fff
705
706 bb.limit(split)
707 ch.writeFully(bb)
708 yield()
709 bb.limit(bb.capacity())
710 ch.writeFully(bb)
711 }
712
713 ch.close()
714 }
715
716 val dest = ByteBufferChannel(true, pool)
717
718 val joinerJob = launch {
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300719 ch.joinTo(dest, true)
Sergey Mashkov88951ac2017-10-27 22:36:08 +0300720 }
721
722 val reader = launch {
723 val bb = ByteBuffer.allocate(8192)
724
725 for (i in 1..count) {
726 bb.clear()
727 dest.readFully(bb)
728 bb.flip()
729
730 if (i and 0x1fff == 0) {
731 for (idx in 0 until bb.capacity()) {
732 assertEquals((idx and 0xff).toByte(), bb.get())
733 }
734 }
735 }
736
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300737 bb.clear()
738 assertEquals(-1, dest.readAvailable(bb))
Sergey Mashkov88951ac2017-10-27 22:36:08 +0300739 assertTrue(dest.isClosedForRead)
740 }
741
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300742 val latch = CountDownLatch(1)
743 val r = AtomicInteger(3)
744
745 val handler: CompletionHandler = { t ->
746 t?.let { failures.addError(it); latch.countDown() }
747 if (r.decrementAndGet() == 0) latch.countDown()
Sergey Mashkov88951ac2017-10-27 22:36:08 +0300748 }
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300749
Roman Elizarov1f0df4b2017-12-18 16:29:52 +0300750 reader.invokeOnCompletion(onCancelling = true, handler = handler)
751 writerJob.invokeOnCompletion(onCancelling = true, handler = handler)
752 joinerJob.invokeOnCompletion(onCancelling = true, handler = handler)
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300753
754 latch.await()
Sergey Mashkov88951ac2017-10-27 22:36:08 +0300755 }
756
757 private fun launch(block: suspend () -> Unit): Job {
758 return launch(DefaultDispatcher) {
759 try {
760 block()
761 } catch (t: Throwable) {
762 failures.addError(t)
763 }
764 }
765 }
766
767 @Test
Sergey Mashkov32c90b12017-11-07 19:18:16 +0300768 fun testStressReadWriteFully() = runBlocking {
769 val size = 100
770 val data = ByteArray(size) { it.toByte() }
771 val exec = newFixedThreadPoolContext(8, "testStressReadFully")
772 val buffers = object : DefaultPool<ByteArray>(10) {
773 override fun produceInstance(): ByteArray {
774 return ByteArray(size)
775 }
776 }
777
778 try {
Roman Elizarov6129c942018-01-10 17:34:40 +0300779 (1..100_000 * stressTestMultiplier).map {
Sergey Mashkov32c90b12017-11-07 19:18:16 +0300780 async(exec) {
781 val channel = ByteBufferChannel(autoFlush = false, pool = pool)
782 val job = launch(exec) {
783 try {
784 channel.writeFully(data)
785 } finally {
786 channel.close()
787 }
788 }
789
790 yield()
791 val buffer = buffers.borrow()
792 channel.readFully(buffer)
793 buffers.recycle(buffer)
794 job.cancel()
795 }
796 }.forEach {
797 it.await()
798 }
799 } finally {
800 exec.close()
801 }
802 }
803
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300804 @Test
Sergey Mashkovd489d3f2017-11-08 07:15:01 +0300805 fun testJoinToSmokeTest() = runBlocking<Unit> {
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300806 val other = ByteBufferChannel(autoFlush = false, pool = pool)
Sergey Mashkovd489d3f2017-11-08 07:15:01 +0300807 launch(coroutineContext) {
808 ch.joinTo(other, false)
809 }
810 yield()
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300811
812 ch.writeInt(0x11223344)
813 ch.flush()
814 assertEquals(0x11223344, other.readInt())
Sergey Mashkovd489d3f2017-11-08 07:15:01 +0300815
816 ch.close()
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300817 }
818
819 @Test
Sergey Mashkovc9d73f52017-11-09 21:18:33 +0300820 fun testJoinToResumeRead() = runBlocking<Unit> {
821 val other = ByteBufferChannel(autoFlush = true, pool = pool)
822 val result = async(coroutineContext) {
823 other.readLong()
824 }
825 yield()
826
827 launch(coroutineContext) {
828 ch.joinTo(other, true)
829 }
830 yield()
831 yield()
832
833 ch.writeLong(0x1122334455667788L)
834 yield()
835 assertEquals(0x1122334455667788L, result.await())
836
837 ch.close()
838 }
839
840 @Test
Sergey Mashkovd489d3f2017-11-08 07:15:01 +0300841 fun testJoinToAfterWrite() = runBlocking<Unit> {
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300842 val other = ByteBufferChannel(autoFlush = false, pool = pool)
843
844 ch.writeInt(0x12345678)
Sergey Mashkovd489d3f2017-11-08 07:15:01 +0300845 launch(coroutineContext) {
846 ch.joinTo(other, false)
847 }
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300848
849 ch.writeInt(0x11223344)
850 ch.flush()
851
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300852 yield()
853
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300854 assertEquals(0x12345678, other.readInt())
855 assertEquals(0x11223344, other.readInt())
Sergey Mashkovd489d3f2017-11-08 07:15:01 +0300856 ch.close()
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300857 }
858
Sergey Mashkov9e9929d2017-11-14 14:52:32 +0300859 @Test
860 fun testJoinToClosed() = runBlocking<Unit> {
861 val other = ByteBufferChannel(autoFlush = false, pool = pool)
862
863 ch.writeInt(0x11223344)
864 ch.close()
865
866 ch.joinTo(other, true)
867 yield()
868
869 assertEquals(0x11223344, other.readInt())
870 assertTrue { other.isClosedForRead }
871 }
Sergey Mashkov62deb432017-11-09 19:50:26 +0300872
873 @Test
Sergey Mashkov8af89372017-12-05 12:35:56 +0300874 fun testJoinToDifferentEndian() = runBlocking {
875 val other = ByteBufferChannel(autoFlush = true, pool = pool)
876 other.writeByteOrder = ByteOrder.LITTLE_ENDIAN
877 ch.writeByteOrder = ByteOrder.BIG_ENDIAN
878
879 ch.writeInt(0x11223344) // BE
880
881 launch(coroutineContext) {
882 ch.joinTo(other, true)
883 }
884
885 yield()
886
887 ch.writeInt(0x55667788) // BE
888 ch.writeByteOrder = ByteOrder.LITTLE_ENDIAN
889 ch.writeInt(0x0abbccdd) // LE
890 ch.close()
891
892 other.readByteOrder = ByteOrder.BIG_ENDIAN
893 assertEquals(0x11223344, other.readInt()) // BE
894 assertEquals(0x55667788, other.readInt()) // BE
895 other.readByteOrder = ByteOrder.LITTLE_ENDIAN
896 assertEquals(0x0abbccdd, other.readInt()) // LE
897 }
898
899 @Test
Sergey Mashkov62deb432017-11-09 19:50:26 +0300900 fun testReadThenRead() = runBlocking<Unit> {
901 val phase = AtomicInteger(0)
902
903 val first = launch(coroutineContext) {
904 try {
905 ch.readInt()
906 fail("EOF expected")
907 } catch (expected: ClosedReceiveChannelException) {
908 assertEquals(1, phase.get())
909 }
910 }
911
912 yield()
913
914 val second = launch(coroutineContext) {
915 try {
916 ch.readInt()
917 fail("Should fail with ISE")
918 } catch (expected: IllegalStateException) {
919 }
920 }
921
922 yield()
923 phase.set(1)
924 ch.close()
925
926 yield()
927
928 first.invokeOnCompletion { t ->
929 t?.let { throw it }
930 }
931 second.invokeOnCompletion { t ->
932 t?.let { throw it }
933 }
934 }
935
936 @Test
Sergey Mashkov98c87392017-11-20 17:58:28 +0300937 fun writeThenReadStress() = runBlocking<Unit> {
Roman Elizarov6129c942018-01-10 17:34:40 +0300938 for (i in 1..50_000 * stressTestMultiplier) {
Sergey Mashkov98c87392017-11-20 17:58:28 +0300939 val a = ByteBufferChannel(false, pool)
940
941 val w = launch {
942 a.writeLong(1)
943 a.close()
944 }
945 val r = launch {
946 a.readLong()
947 }
948
949 w.join()
950 r.join()
951 }
952
953 ch.close()
954 }
955
956 @Test
957 fun joinToEmptyStress() = runBlocking<Unit> {
Roman Elizarov6129c942018-01-10 17:34:40 +0300958 for (i in 1..50_000 * stressTestMultiplier) {
Sergey Mashkov98c87392017-11-20 17:58:28 +0300959 val a = ByteBufferChannel(false, pool)
960
961 launch(coroutineContext) {
962 a.joinTo(ch, true)
963 }
964
965 yield()
966
967 a.close()
968 }
969 }
970
971 @Test
Sergey Mashkov62deb432017-11-09 19:50:26 +0300972 fun testJoinToStress() = runBlocking<Unit> {
Roman Elizarov6129c942018-01-10 17:34:40 +0300973 for (i in 1..10000 * stressTestMultiplier) {
Sergey Mashkov62deb432017-11-09 19:50:26 +0300974 val child = ByteBufferChannel(false, pool)
975 val writer = launch {
Sergey Mashkov98c87392017-11-20 17:58:28 +0300976 child.writeLong(999 + i.toLong())
Sergey Mashkov62deb432017-11-09 19:50:26 +0300977 child.close()
978 }
979
980 child.joinTo(ch, false)
Sergey Mashkov98c87392017-11-20 17:58:28 +0300981 assertEquals(999 + i.toLong(), ch.readLong())
982 writer.join()
Sergey Mashkov62deb432017-11-09 19:50:26 +0300983 }
Sergey Mashkov98c87392017-11-20 17:58:28 +0300984
985 assertEquals(0, ch.availableForRead)
986 ch.close()
Sergey Mashkov62deb432017-11-09 19:50:26 +0300987 }
988
Sergey Mashkov2fdb5aa2017-11-08 16:50:13 +0300989 @Test
990 fun testSequentialJoin() = runBlocking<Unit> {
Roman Elizarov6129c942018-01-10 17:34:40 +0300991 val steps = 20_000 * stressTestMultiplier
Sergey Mashkov2fdb5aa2017-11-08 16:50:13 +0300992
993 val pipeline = launch(coroutineContext) {
994 for (i in 1..steps) {
995 val child = ByteBufferChannel(false, pool)
996 launch(coroutineContext) {
997 child.writeInt(i)
998 child.close()
999 }
1000 child.joinTo(ch, false)
1001 }
1002 }
1003
1004 for (i in 1..steps) {
1005 assertEquals(i, ch.readInt())
1006 }
1007
1008 pipeline.join()
1009 pipeline.invokeOnCompletion { cause ->
1010 cause?.let { throw it }
1011 }
1012 }
1013
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001014 @Test
1015 fun testSequentialJoinYield() = runBlocking<Unit> {
Roman Elizarov6129c942018-01-10 17:34:40 +03001016 val steps = 20_000 * stressTestMultiplier
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001017
1018 val pipeline = launch(coroutineContext) {
1019 for (i in 1..steps) {
1020 val child = ByteBufferChannel(false, pool)
1021 launch(coroutineContext) {
1022 child.writeInt(i)
1023 child.close()
1024 }
1025 yield()
1026 child.joinTo(ch, false)
1027 }
1028 }
1029
1030 for (i in 1..steps) {
1031 assertEquals(i, ch.readInt())
1032 }
1033
1034 pipeline.join()
1035 pipeline.invokeOnCompletion { cause ->
1036 cause?.let { throw it }
1037 }
1038 }
1039
Sergey Mashkovde8e44f2017-12-05 15:41:23 +03001040 @Test
1041 fun testReadBlock() = runBlocking<Unit> {
1042 var bytesRead = 0L
1043
1044 val r: (ByteBuffer) -> Unit = { bb ->
1045 bytesRead += bb.remaining()
1046 bb.position(bb.limit())
1047 }
1048
1049 val j = launch(coroutineContext) {
1050 while (!ch.isClosedForRead) {
1051 ch.read(0, r)
1052 }
1053 }
1054
1055 yield()
1056
1057 ch.writeStringUtf8("OK\n")
1058 ch.close()
1059
1060 j.join()
1061 j.invokeOnCompletion {
1062 it?.let { throw it }
1063 }
1064 }
1065
1066 @Test
1067 fun testReadBlock2() = runBlocking<Unit> {
1068 var bytesRead = 0L
1069
1070 val r: (ByteBuffer) -> Unit = { bb ->
1071 bytesRead += bb.remaining()
1072 bb.position(bb.limit())
1073 }
1074
1075 val j = launch(coroutineContext) {
1076 while (!ch.isClosedForRead) {
1077 ch.read(0, r)
1078 }
1079 }
1080
1081 ch.writeStringUtf8("OK\n")
1082 yield()
1083 ch.close()
1084
1085 j.join()
1086 j.invokeOnCompletion {
1087 it?.let { throw it }
1088 }
1089 }
1090
Sergey Mashkova26b7902017-12-05 16:06:03 +03001091 @Test
1092 fun testCancelWriter() = runBlocking {
1093 val sub = writer(DefaultDispatcher) {
1094 delay(1000000L)
1095 }
1096
1097 sub.channel.cancel()
1098 sub.join()
1099 }
1100
1101 @Test
1102 fun testCancelReader() = runBlocking {
1103 val sub = reader(DefaultDispatcher) {
1104 delay(10000000L)
1105 }
1106
1107 sub.channel.close(CancellationException())
1108 sub.join()
1109 }
Sergey Mashkovde8e44f2017-12-05 15:41:23 +03001110
Sergey Mashkov50a08572017-10-09 12:56:07 +03001111 private inline fun buildPacket(block: ByteWritePacket.() -> Unit): ByteReadPacket {
1112 val builder = BytePacketBuilder(0, pktPool)
1113 try {
1114 block(builder)
1115 return builder.build()
1116 } catch (t: Throwable) {
1117 builder.release()
1118 throw t
1119 }
1120 }
Sergey Mashkov86f70612017-07-25 10:59:44 +03001121}