blob: 03dd75d7fd50aacb1146be47973c79106d3d5b41 [file] [log] [blame]
Sergey Mashkov86f70612017-07-25 10:59:44 +03001package kotlinx.coroutines.experimental.io
2
Sergey Mashkov32c90b12017-11-07 19:18:16 +03003import kotlinx.coroutines.experimental.*
Roman Elizarov9fe5f462018-02-21 19:05:52 +03004import kotlinx.coroutines.experimental.CancellationException
5import kotlinx.coroutines.experimental.channels.*
6import kotlinx.coroutines.experimental.io.internal.*
7import kotlinx.coroutines.experimental.io.packet.*
Sergey Mashkov50a08572017-10-09 12:56:07 +03008import kotlinx.coroutines.experimental.io.packet.ByteReadPacket
Roman Elizarov9fe5f462018-02-21 19:05:52 +03009import kotlinx.io.core.*
10import kotlinx.io.pool.*
Sergey Mashkov4d01f7e2018-01-12 18:46:22 +030011import org.junit.*
Roman Elizarov9fe5f462018-02-21 19:05:52 +030012import org.junit.Test
13import org.junit.rules.*
Roman Elizarov96d7a882017-08-30 15:17:05 +030014import java.nio.CharBuffer
Sergey Mashkov86f70612017-07-25 10:59:44 +030015import java.util.*
Roman Elizarov9fe5f462018-02-21 19:05:52 +030016import java.util.concurrent.*
17import java.util.concurrent.atomic.*
18import kotlin.coroutines.experimental.*
19import kotlin.test.*
Sergey Mashkov86f70612017-07-25 10:59:44 +030020
Roman Elizarov6129c942018-01-10 17:34:40 +030021class ByteBufferChannelTest : TestBase() {
Sergey Mashkov86f70612017-07-25 10:59:44 +030022 @get:Rule
Roman Elizarove084cf72018-01-11 10:09:58 +030023 val timeout = Timeout(100L * stressTestMultiplier, TimeUnit.SECONDS)
Sergey Mashkov86f70612017-07-25 10:59:44 +030024
25 @get:Rule
Sergey Mashkov57906eb2017-09-19 14:08:06 +030026 private val failures = ErrorCollector()
27
28 @get:Rule
Sergey Mashkov32c90b12017-11-07 19:18:16 +030029 internal val pool = VerifyingObjectPool(object : NoPoolImpl<ReadWriteBufferState.Initial>() {
30 override fun borrow(): ReadWriteBufferState.Initial {
31 return ReadWriteBufferState.Initial(java.nio.ByteBuffer.allocate(4096))
32 }
33 })
Sergey Mashkov57906eb2017-09-19 14:08:06 +030034
35 @get:Rule
Sergey Mashkovccf8dde2017-09-29 18:18:00 +030036 internal val pktPool = VerifyingObjectPool(BufferView.Pool)
Sergey Mashkov86f70612017-07-25 10:59:44 +030037
Roman Elizarov83762032017-08-08 19:58:40 +030038 private val Size = BUFFER_SIZE - RESERVED_SIZE
Sergey Mashkov57906eb2017-09-19 14:08:06 +030039 private val ch = ByteBufferChannel(autoFlush = false, pool = pool)
Sergey Mashkov86f70612017-07-25 10:59:44 +030040
Sergey Mashkovc51ecd32017-10-27 22:42:39 +030041 @After
42 fun finish() {
43 ch.close(InterruptedException())
44 }
45
Sergey Mashkov86f70612017-07-25 10:59:44 +030046 @Test
47 fun testBoolean() {
Sergey Mashkov869bde52018-01-23 16:25:05 +030048 runTest {
Sergey Mashkov86f70612017-07-25 10:59:44 +030049 ch.writeBoolean(true)
50 ch.flush()
51 assertEquals(true, ch.readBoolean())
52
53 ch.writeBoolean(false)
54 ch.flush()
55 assertEquals(false, ch.readBoolean())
56 }
57 }
58
59 @Test
60 fun testByte() {
Sergey Mashkov869bde52018-01-23 16:25:05 +030061 runTest {
Roman Elizarov469cad32017-08-15 15:54:56 +030062 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +030063 ch.writeByte(-1)
64 ch.flush()
Roman Elizarov469cad32017-08-15 15:54:56 +030065 assertEquals(1, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +030066 assertEquals(-1, ch.readByte())
Roman Elizarov469cad32017-08-15 15:54:56 +030067 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +030068 }
69 }
70
71 @Test
72 fun testShortB() {
Sergey Mashkov869bde52018-01-23 16:25:05 +030073 runTest {
Sergey Mashkov86f70612017-07-25 10:59:44 +030074 ch.readByteOrder = ByteOrder.BIG_ENDIAN
75 ch.writeByteOrder = ByteOrder.BIG_ENDIAN
76
Roman Elizarov469cad32017-08-15 15:54:56 +030077 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +030078 ch.writeShort(-1)
Roman Elizarov469cad32017-08-15 15:54:56 +030079 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +030080 ch.flush()
Roman Elizarov469cad32017-08-15 15:54:56 +030081 assertEquals(2, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +030082 assertEquals(-1, ch.readShort())
Roman Elizarov469cad32017-08-15 15:54:56 +030083 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +030084 }
85 }
86
87 @Test
88 fun testShortL() {
Sergey Mashkov869bde52018-01-23 16:25:05 +030089 runTest {
Sergey Mashkov86f70612017-07-25 10:59:44 +030090 ch.readByteOrder = ByteOrder.LITTLE_ENDIAN
91 ch.writeByteOrder = ByteOrder.LITTLE_ENDIAN
92
Roman Elizarov469cad32017-08-15 15:54:56 +030093 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +030094 ch.writeShort(-1)
Roman Elizarov469cad32017-08-15 15:54:56 +030095 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +030096 ch.flush()
Roman Elizarov469cad32017-08-15 15:54:56 +030097 assertEquals(2, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +030098 assertEquals(-1, ch.readShort())
Roman Elizarov469cad32017-08-15 15:54:56 +030099 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300100 }
101 }
102
103 @Test
104 fun testShortEdge() {
Sergey Mashkov869bde52018-01-23 16:25:05 +0300105 runTest {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300106 ch.writeByte(1)
107
108 for (i in 2 until Size step 2) {
109 ch.writeShort(0x00ee)
110 }
111
112 ch.flush()
113
114 ch.readByte()
115 ch.writeShort(0x1234)
116
117 ch.flush()
118
Roman Elizarov469cad32017-08-15 15:54:56 +0300119 while (ch.availableForRead > 2) {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300120 ch.readShort()
121 }
122
123 assertEquals(0x1234, ch.readShort())
124 }
125 }
126
127 @Test
128 fun testIntB() {
Sergey Mashkov869bde52018-01-23 16:25:05 +0300129 runTest {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300130 ch.readByteOrder = ByteOrder.BIG_ENDIAN
131 ch.writeByteOrder = ByteOrder.BIG_ENDIAN
132
Roman Elizarov469cad32017-08-15 15:54:56 +0300133 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300134 ch.writeInt(-1)
135 ch.flush()
Roman Elizarov469cad32017-08-15 15:54:56 +0300136 assertEquals(4, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300137 assertEquals(-1, ch.readInt())
Roman Elizarov469cad32017-08-15 15:54:56 +0300138 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300139 }
140 }
141
142 @Test
143 fun testIntL() {
Sergey Mashkov869bde52018-01-23 16:25:05 +0300144 runTest {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300145 ch.readByteOrder = ByteOrder.LITTLE_ENDIAN
146 ch.writeByteOrder = ByteOrder.LITTLE_ENDIAN
147
Roman Elizarov469cad32017-08-15 15:54:56 +0300148 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300149 ch.writeInt(-1)
150 ch.flush()
Roman Elizarov469cad32017-08-15 15:54:56 +0300151 assertEquals(4, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300152 assertEquals(-1, ch.readInt())
Roman Elizarov469cad32017-08-15 15:54:56 +0300153 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300154 }
155 }
156
157 @Test
158 fun testIntEdge() {
Sergey Mashkov869bde52018-01-23 16:25:05 +0300159 runTest {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300160 for (shift in 1..3) {
161 for (i in 1..shift) {
162 ch.writeByte(1)
163 }
164
165 repeat(Size / 4 - 1) {
166 ch.writeInt(0xeeeeeeeeL)
167 }
168
169 ch.flush()
170
171 for (i in 1..shift) {
172 ch.readByte()
173 }
174
175 ch.writeInt(0x12345678)
176
177 ch.flush()
178
Roman Elizarov469cad32017-08-15 15:54:56 +0300179 while (ch.availableForRead > 4) {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300180 ch.readInt()
181 }
182
183 assertEquals(0x12345678, ch.readInt())
184 }
185 }
186 }
187
188 @Test
Sergey Mashkova432a822018-05-21 12:50:14 +0300189 fun testIntEdge2() {
190 runTest {
191 for (shift in 1..3) {
192 for (i in 1..shift) {
193 ch.writeByte(1)
194 }
195
196 repeat(Size / 4 - 1) {
197 ch.writeInt(0xeeeeeeeeL)
198 }
199
200 ch.flush()
201
202 for (i in 1..shift) {
203 ch.readByte()
204 }
205
206 ch.writeByte(0x12)
207 ch.writeByte(0x34)
208 ch.writeByte(0x56)
209 ch.writeByte(0x78)
210
211 ch.flush()
212
213 while (ch.availableForRead > 4) {
214 ch.readInt()
215 }
216
217 assertEquals(0x12345678, ch.readInt())
218 }
219 }
220 }
221
222
223 @Test
Sergey Mashkov86f70612017-07-25 10:59:44 +0300224 fun testLongB() {
Sergey Mashkov869bde52018-01-23 16:25:05 +0300225 runTest {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300226 ch.readByteOrder = ByteOrder.BIG_ENDIAN
227 ch.writeByteOrder = ByteOrder.BIG_ENDIAN
228
Roman Elizarov469cad32017-08-15 15:54:56 +0300229 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300230 ch.writeLong(Long.MIN_VALUE)
231 ch.flush()
Roman Elizarov469cad32017-08-15 15:54:56 +0300232 assertEquals(8, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300233 assertEquals(Long.MIN_VALUE, ch.readLong())
Roman Elizarov469cad32017-08-15 15:54:56 +0300234 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300235 }
236 }
237
238 @Test
239 fun testLongL() {
Sergey Mashkov869bde52018-01-23 16:25:05 +0300240 runTest {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300241 ch.readByteOrder = ByteOrder.LITTLE_ENDIAN
242 ch.writeByteOrder = ByteOrder.LITTLE_ENDIAN
243
Roman Elizarov469cad32017-08-15 15:54:56 +0300244 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300245 ch.writeLong(Long.MIN_VALUE)
246 ch.flush()
Roman Elizarov469cad32017-08-15 15:54:56 +0300247 assertEquals(8, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300248 assertEquals(Long.MIN_VALUE, ch.readLong())
Roman Elizarov469cad32017-08-15 15:54:56 +0300249 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300250 }
251 }
252
253 @Test
254 fun testLongEdge() {
Sergey Mashkov869bde52018-01-23 16:25:05 +0300255 runTest {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300256 for (shift in 1..7) {
257 for (i in 1..shift) {
258 ch.writeByte(1)
259 }
260
261 repeat(Size / 8 - 1) {
262 ch.writeLong(0x11112222eeeeeeeeL)
263 }
264
265 ch.flush()
266 for (i in 1..shift) {
267 ch.readByte()
268 }
269
270 ch.writeLong(0x1234567812345678L)
271 ch.flush()
272
Roman Elizarov469cad32017-08-15 15:54:56 +0300273 while (ch.availableForRead > 8) {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300274 ch.readLong()
275 }
276
277 assertEquals(0x1234567812345678L, ch.readLong())
278 }
279 }
280 }
281
282 @Test
283 fun testDoubleB() {
Sergey Mashkov869bde52018-01-23 16:25:05 +0300284 runTest {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300285 ch.readByteOrder = ByteOrder.BIG_ENDIAN
286 ch.writeByteOrder = ByteOrder.BIG_ENDIAN
287
Roman Elizarov469cad32017-08-15 15:54:56 +0300288 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300289 ch.writeDouble(1.05)
290 ch.flush()
291
Roman Elizarov469cad32017-08-15 15:54:56 +0300292 assertEquals(8, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300293 assertEquals(1.05, ch.readDouble())
Roman Elizarov469cad32017-08-15 15:54:56 +0300294 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300295 }
296 }
297
298 @Test
299 fun testDoubleL() {
Sergey Mashkov869bde52018-01-23 16:25:05 +0300300 runTest {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300301 ch.readByteOrder = ByteOrder.LITTLE_ENDIAN
302 ch.writeByteOrder = ByteOrder.LITTLE_ENDIAN
303
Roman Elizarov469cad32017-08-15 15:54:56 +0300304 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300305 ch.writeDouble(1.05)
306 ch.flush()
307
Roman Elizarov469cad32017-08-15 15:54:56 +0300308 assertEquals(8, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300309 assertEquals(1.05, ch.readDouble())
Roman Elizarov469cad32017-08-15 15:54:56 +0300310 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300311 }
312 }
313
314 @Test
315 fun testFloatB() {
Sergey Mashkov869bde52018-01-23 16:25:05 +0300316 runTest {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300317 ch.readByteOrder = ByteOrder.BIG_ENDIAN
318 ch.writeByteOrder = ByteOrder.BIG_ENDIAN
319
Roman Elizarov469cad32017-08-15 15:54:56 +0300320 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300321 ch.writeFloat(1.05f)
322 ch.flush()
323
Roman Elizarov469cad32017-08-15 15:54:56 +0300324 assertEquals(4, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300325 assertEquals(1.05f, ch.readFloat())
Roman Elizarov469cad32017-08-15 15:54:56 +0300326 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300327 }
328 }
329
330 @Test
331 fun testFloatL() {
Sergey Mashkov869bde52018-01-23 16:25:05 +0300332 runTest {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300333 ch.readByteOrder = ByteOrder.LITTLE_ENDIAN
334 ch.writeByteOrder = ByteOrder.LITTLE_ENDIAN
335
Roman Elizarov469cad32017-08-15 15:54:56 +0300336 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300337 ch.writeFloat(1.05f)
338 ch.flush()
339
Roman Elizarov469cad32017-08-15 15:54:56 +0300340 assertEquals(4, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300341 assertEquals(1.05f, ch.readFloat())
Roman Elizarov469cad32017-08-15 15:54:56 +0300342 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300343 }
344 }
345
Roman Elizarovfccac1d2017-08-15 11:58:43 +0300346
347
Sergey Mashkov86f70612017-07-25 10:59:44 +0300348 @Test
349 fun testEndianMix() {
Roman Elizarovfccac1d2017-08-15 11:58:43 +0300350 val byteOrders = listOf(ByteOrder.BIG_ENDIAN, ByteOrder.LITTLE_ENDIAN)
Sergey Mashkov869bde52018-01-23 16:25:05 +0300351 runTest {
Roman Elizarovfccac1d2017-08-15 11:58:43 +0300352 for (writeOrder in byteOrders) {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300353 ch.writeByteOrder = writeOrder
354
Roman Elizarovfccac1d2017-08-15 11:58:43 +0300355 for (readOrder in byteOrders) {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300356 ch.readByteOrder = readOrder
357
Roman Elizarov469cad32017-08-15 15:54:56 +0300358 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300359 ch.writeShort(0x001f)
360 ch.flush()
361 if (writeOrder == readOrder)
362 assertEquals(0x001f, ch.readShort())
363 else
364 assertEquals(0x1f00, ch.readShort())
365
Roman Elizarov469cad32017-08-15 15:54:56 +0300366 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300367 ch.writeShort(0x001f)
368 ch.flush()
369 if (writeOrder == readOrder)
370 assertEquals(0x001f, ch.readShort())
371 else
372 assertEquals(0x1f00, ch.readShort())
373
Roman Elizarov469cad32017-08-15 15:54:56 +0300374 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300375 ch.writeInt(0x1f)
376 ch.flush()
377 if (writeOrder == readOrder)
378 assertEquals(0x0000001f, ch.readInt())
379 else
380 assertEquals(0x1f000000, ch.readInt())
381
Roman Elizarov469cad32017-08-15 15:54:56 +0300382 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300383 ch.writeInt(0x1fL)
384 ch.flush()
385 if (writeOrder == readOrder)
386 assertEquals(0x0000001f, ch.readInt())
387 else
388 assertEquals(0x1f000000, ch.readInt())
389
Roman Elizarov469cad32017-08-15 15:54:56 +0300390 assertEquals(0, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300391 ch.writeLong(0x1f)
392 ch.flush()
393 if (writeOrder == readOrder)
394 assertEquals(0x1f, ch.readLong())
395 else
396 assertEquals(0x1f00000000000000L, ch.readLong())
397 }
398 }
399 }
400 }
401
402 @Test
403 fun testClose() {
Sergey Mashkov869bde52018-01-23 16:25:05 +0300404 runTest {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300405 ch.writeByte(1)
406 ch.writeByte(2)
407 ch.writeByte(3)
408
409 ch.flush()
410 assertEquals(1, ch.readByte())
411 ch.close()
412
413 assertEquals(2, ch.readByte())
414 assertEquals(3, ch.readByte())
415
416 try {
417 ch.readByte()
418 fail()
419 } catch (expected: ClosedReceiveChannelException) {
420 }
421 }
422 }
423
424 @Test
425 fun testReadAndWriteFully() {
Sergey Mashkov869bde52018-01-23 16:25:05 +0300426 runTest {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300427 val bytes = byteArrayOf(1, 2, 3, 4, 5)
428 val dst = ByteArray(5)
429
430 ch.writeFully(bytes)
431 ch.flush()
Roman Elizarov469cad32017-08-15 15:54:56 +0300432 assertEquals(5, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300433 ch.readFully(dst)
434 assertTrue { dst.contentEquals(bytes) }
435
436 ch.writeFully(bytes)
437 ch.flush()
438
439 val dst2 = ByteArray(4)
440 ch.readFully(dst2)
441
Roman Elizarov469cad32017-08-15 15:54:56 +0300442 assertEquals(1, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300443 assertEquals(5, ch.readByte())
444
445 ch.close()
446
447 try {
448 ch.readFully(dst)
449 fail("")
450 } catch (expected: ClosedReceiveChannelException) {
451 }
452 }
453 }
454
455 @Test
456 fun testReadAndWriteFullyByteBuffer() {
Sergey Mashkov869bde52018-01-23 16:25:05 +0300457 runTest {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300458 val bytes = byteArrayOf(1, 2, 3, 4, 5)
459 val dst = ByteArray(5)
460
461 ch.writeFully(ByteBuffer.wrap(bytes))
462 ch.flush()
Roman Elizarov469cad32017-08-15 15:54:56 +0300463 assertEquals(5, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300464 ch.readFully(ByteBuffer.wrap(dst))
465 assertTrue { dst.contentEquals(bytes) }
466
467 ch.writeFully(ByteBuffer.wrap(bytes))
468 ch.flush()
469
470 val dst2 = ByteArray(4)
471 ch.readFully(ByteBuffer.wrap(dst2))
472
Roman Elizarov469cad32017-08-15 15:54:56 +0300473 assertEquals(1, ch.availableForRead)
Sergey Mashkov86f70612017-07-25 10:59:44 +0300474 assertEquals(5, ch.readByte())
475
476 ch.close()
477
478 try {
479 ch.readFully(ByteBuffer.wrap(dst))
480 fail("")
481 } catch (expected: ClosedReceiveChannelException) {
482 }
483 }
484 }
485
486 @Test
487 fun testReadAndWritePartially() {
Sergey Mashkov869bde52018-01-23 16:25:05 +0300488 runTest {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300489 val bytes = byteArrayOf(1, 2, 3, 4, 5)
490
Roman Elizarov469cad32017-08-15 15:54:56 +0300491 assertEquals(5, ch.writeAvailable(bytes))
Sergey Mashkov86f70612017-07-25 10:59:44 +0300492 ch.flush()
Roman Elizarov469cad32017-08-15 15:54:56 +0300493 assertEquals(5, ch.readAvailable(ByteArray(100)))
Sergey Mashkov86f70612017-07-25 10:59:44 +0300494
495 repeat(Size / bytes.size) {
Roman Elizarov469cad32017-08-15 15:54:56 +0300496 assertNotEquals(0, ch.writeAvailable(bytes))
Sergey Mashkov86f70612017-07-25 10:59:44 +0300497 ch.flush()
498 }
499
Roman Elizarov469cad32017-08-15 15:54:56 +0300500 ch.readAvailable(ByteArray(ch.availableForRead - 1))
501 assertEquals(1, ch.readAvailable(ByteArray(100)))
Sergey Mashkov86f70612017-07-25 10:59:44 +0300502
503 ch.close()
504 }
505 }
506
507 @Test
508 fun testReadAndWritePartiallyByteBuffer() {
Sergey Mashkov869bde52018-01-23 16:25:05 +0300509 runTest {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300510 val bytes = byteArrayOf(1, 2, 3, 4, 5)
511
Roman Elizarov469cad32017-08-15 15:54:56 +0300512 assertEquals(5, ch.writeAvailable(ByteBuffer.wrap(bytes)))
Sergey Mashkov86f70612017-07-25 10:59:44 +0300513 ch.flush()
Roman Elizarov469cad32017-08-15 15:54:56 +0300514 assertEquals(5, ch.readAvailable(ByteBuffer.allocate(100)))
Sergey Mashkov86f70612017-07-25 10:59:44 +0300515
516 repeat(Size / bytes.size) {
Roman Elizarov469cad32017-08-15 15:54:56 +0300517 assertNotEquals(0, ch.writeAvailable(ByteBuffer.wrap(bytes)))
Sergey Mashkov86f70612017-07-25 10:59:44 +0300518 ch.flush()
519 }
520
Roman Elizarov469cad32017-08-15 15:54:56 +0300521 ch.readAvailable(ByteArray(ch.availableForRead - 1))
522 assertEquals(1, ch.readAvailable(ByteBuffer.allocate(100)))
Sergey Mashkov86f70612017-07-25 10:59:44 +0300523
524 ch.close()
525 }
526 }
527
528
529 @Test
530 fun testReadAndWriteBig() {
531 val count = 200
532 val bytes = ByteArray(65536)
533 Random().nextBytes(bytes)
534
535 launch(CommonPool + CoroutineName("writer")) {
536 for (i in 1..count) {
537 ch.writeFully(bytes)
538 ch.flush()
539 }
540 }.invokeOnCompletion { t ->
541 if (t != null) {
542 failures.addError(t)
543 }
544 }
545
Sergey Mashkov869bde52018-01-23 16:25:05 +0300546 runTest {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300547 val dst = ByteArray(bytes.size)
548 for (i in 1..count) {
549 ch.readFully(dst)
550 assertTrue { dst.contentEquals(bytes) }
551 dst.fill(0)
552 }
553 }
554 }
555
556 @Test
557 fun testReadAndWriteBigByteBuffer() {
558 val count = 200
559 val bytes = ByteArray(65536)
560 Random().nextBytes(bytes)
561
562 launch(CommonPool + CoroutineName("writer")) {
563 for (i in 1..count) {
564 ch.writeFully(ByteBuffer.wrap(bytes))
565 ch.flush()
566 }
567 }.invokeOnCompletion { t ->
568 if (t != null) {
569 failures.addError(t)
570 }
571 }
572
Sergey Mashkov869bde52018-01-23 16:25:05 +0300573 runTest {
Sergey Mashkov86f70612017-07-25 10:59:44 +0300574 val dst = ByteArray(bytes.size)
575 for (i in 1..count) {
576 ch.readFully(ByteBuffer.wrap(dst))
577 assertTrue { dst.contentEquals(bytes) }
578 dst.fill(0)
579 }
580 }
581 }
Sergey Mashkova10b6922017-08-03 13:41:54 +0300582
583 @Test
Sergey Mashkov869bde52018-01-23 16:25:05 +0300584 fun testPacket() = runTest {
Sergey Mashkova10b6922017-08-03 13:41:54 +0300585 val packet = buildPacket {
586 writeInt(0xffee)
Sergey Mashkovfbcba262017-08-23 12:26:18 +0300587 writeStringUtf8("Hello")
Sergey Mashkova10b6922017-08-03 13:41:54 +0300588 }
589
590 ch.writeInt(packet.remaining)
591 ch.writePacket(packet)
592
593 ch.flush()
594
595 val size = ch.readInt()
596 val readed = ch.readPacket(size)
597
598 assertEquals(0xffee, readed.readInt())
599 assertEquals("Hello", readed.readUTF8Line())
600 }
601
602 @Test
Sergey Mashkov869bde52018-01-23 16:25:05 +0300603 fun testBigPacket() = runTest {
Sergey Mashkova10b6922017-08-03 13:41:54 +0300604 launch(CommonPool + CoroutineName("writer")) {
605 val packet = buildPacket {
606 writeInt(0xffee)
Sergey Mashkovfbcba262017-08-23 12:26:18 +0300607 writeStringUtf8(".".repeat(8192))
Sergey Mashkova10b6922017-08-03 13:41:54 +0300608 }
609
610 ch.writeInt(packet.remaining)
611 ch.writePacket(packet)
612
613 ch.flush()
614 }
615
616 val size = ch.readInt()
617 val readed = ch.readPacket(size)
618
619 assertEquals(0xffee, readed.readInt())
620 assertEquals(".".repeat(8192), readed.readUTF8Line())
621 }
Sergey Mashkovfbcba262017-08-23 12:26:18 +0300622
623 @Test
Sergey Mashkov869bde52018-01-23 16:25:05 +0300624 fun testWriteString() = runTest {
Sergey Mashkovfbcba262017-08-23 12:26:18 +0300625 ch.writeStringUtf8("abc")
626 ch.close()
627
628 assertEquals("abc", ch.readASCIILine())
629 }
630
631 @Test
Sergey Mashkov869bde52018-01-23 16:25:05 +0300632 fun testWriteCharSequence() = runTest {
Sergey Mashkovfbcba262017-08-23 12:26:18 +0300633 ch.writeStringUtf8("abc" as CharSequence)
634 ch.close()
635
636 assertEquals("abc", ch.readASCIILine())
637 }
638
639 @Test
Sergey Mashkov869bde52018-01-23 16:25:05 +0300640 fun testWriteCharBuffer() = runTest {
Sergey Mashkovfbcba262017-08-23 12:26:18 +0300641 val cb = CharBuffer.allocate(6)
642
643 for (i in 0 until cb.remaining()) {
644 cb.put(i, ' ')
645 }
646
647 cb.position(2)
648 cb.put(2, 'a')
649 cb.put(3, 'b')
650 cb.put(4, 'c')
651 cb.limit(5)
652
653 assertEquals("abc", cb.slice().toString())
654
655 ch.writeStringUtf8(cb)
656 ch.close()
657
658 assertEquals("abc", ch.readASCIILine())
659 }
Sergey Mashkov57906eb2017-09-19 14:08:06 +0300660
Sergey Mashkov32c90b12017-11-07 19:18:16 +0300661 @Test
Sergey Mashkove7b84bb2018-02-12 19:36:43 +0300662 fun testReadAndWriteLarge() = runTest {
663 val count = 128L * 1024 * stressTestMultiplier // * 8192 -> 1G * M
664 val data = ByteBuffer.allocate(8192)!!
665 Random().nextBytes(data.array())
666
667 launch("writer") {
668 repeat(count.toInt()) {
669 data.clear()
670 ch.writeFully(data)
671 }
672 ch.close()
673 }
674
675 launch("reader") {
676 val buffer = ByteBuffer.allocate(8192)!!
677 var read = 0L
678 val total = count * 8192
679
680 while (read < total) {
681 buffer.clear()
682 val rc = ch.readFully(buffer)
683 if (rc == -1) break
684 read += rc
685 }
686
687 assertEquals(total, read)
688
689 buffer.clear()
690 assertEquals(-1, ch.readAvailable(buffer))
691 }
692 }
693
694 @Test
695 fun testReadAndWriteLargeViaLookAheadSession() = runTest {
696 val count = 128L * 1024 * stressTestMultiplier // * 8192 -> 1G * M
697 val data = ByteBuffer.allocate(8192)!!
698 Random().nextBytes(data.array())
699
700 launch("writer") {
701 repeat(count.toInt()) {
702 data.clear()
703 ch.writeFully(data)
704 }
705 ch.close()
706 }
707
708 launch("reader") {
709 var read = 0L
710 val total = count * 8192
711
712 ch.lookAheadSuspend {
713 while (read < total) {
714 val bb = request(0, 1)
715 if (bb == null) {
716 if (!awaitAtLeast(1)) break
717 continue
718 }
719 val rc = bb.remaining()
720 bb.position(bb.limit())
721 read += rc
722 consumed(rc)
723 }
724 }
725
726 assertEquals(total, read)
727 assertEquals(-1, ch.readAvailable(ByteBuffer.allocate(8192)))
728 }
729 }
730
731 @Test
Sergey Mashkov88951ac2017-10-27 22:36:08 +0300732 fun testCopyLarge() {
Roman Elizarov6129c942018-01-10 17:34:40 +0300733 val count = 100 * 256 * stressTestMultiplier // * 8192
Sergey Mashkov88951ac2017-10-27 22:36:08 +0300734
735 launch {
736 val bb = ByteBuffer.allocate(8192)
737 for (i in 0 until bb.capacity()) {
738 bb.put((i and 0xff).toByte())
739 }
740
741 for (i in 1..count) {
742 bb.clear()
743 val split = i and 0x1fff
744
745 bb.limit(split)
746 ch.writeFully(bb)
747 yield()
748 bb.limit(bb.capacity())
749 ch.writeFully(bb)
750 }
751
752 ch.close()
753 }
754
755 val dest = ByteBufferChannel(true, pool)
756
Sergey Mashkovd489d3f2017-11-08 07:15:01 +0300757 val joinerJob = launch {
Sergey Mashkovb86a0032017-11-08 16:39:03 +0300758 ch.copyAndClose(dest)
759 }
760
761 val reader = launch {
762 val bb = ByteBuffer.allocate(8192)
763
764 for (i in 1..count) {
765 bb.clear()
766 dest.readFully(bb)
767 bb.flip()
768
769 if (i and 0x1fff == 0) {
770 for (idx in 0 until bb.capacity()) {
771 assertEquals((idx and 0xff).toByte(), bb.get())
772 }
773 }
774 }
775
776 yield()
777 assertTrue(dest.isClosedForRead)
778 }
779
Sergey Mashkov869bde52018-01-23 16:25:05 +0300780 runTest {
Sergey Mashkovb86a0032017-11-08 16:39:03 +0300781 reader.join()
782 joinerJob.join()
783 dest.close()
784 ch.close()
785 }
786 }
787
788 @Test
789 fun testJoinToLarge() {
Roman Elizarov6129c942018-01-10 17:34:40 +0300790 val count = 100 * 256 * stressTestMultiplier // * 8192
Sergey Mashkovb86a0032017-11-08 16:39:03 +0300791
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300792 val writerJob = launch {
Sergey Mashkovb86a0032017-11-08 16:39:03 +0300793 val bb = ByteBuffer.allocate(8192)
794 for (i in 0 until bb.capacity()) {
795 bb.put((i and 0xff).toByte())
796 }
797
798 for (i in 1..count) {
799 bb.clear()
800 val split = i and 0x1fff
801
802 bb.limit(split)
803 ch.writeFully(bb)
804 yield()
805 bb.limit(bb.capacity())
806 ch.writeFully(bb)
807 }
808
809 ch.close()
810 }
811
812 val dest = ByteBufferChannel(true, pool)
813
814 val joinerJob = launch {
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300815 ch.joinTo(dest, true)
Sergey Mashkov88951ac2017-10-27 22:36:08 +0300816 }
817
818 val reader = launch {
819 val bb = ByteBuffer.allocate(8192)
820
821 for (i in 1..count) {
822 bb.clear()
823 dest.readFully(bb)
824 bb.flip()
825
826 if (i and 0x1fff == 0) {
827 for (idx in 0 until bb.capacity()) {
828 assertEquals((idx and 0xff).toByte(), bb.get())
829 }
830 }
831 }
832
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300833 bb.clear()
834 assertEquals(-1, dest.readAvailable(bb))
Sergey Mashkov88951ac2017-10-27 22:36:08 +0300835 assertTrue(dest.isClosedForRead)
836 }
837
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300838 val latch = CountDownLatch(1)
839 val r = AtomicInteger(3)
840
841 val handler: CompletionHandler = { t ->
842 t?.let { failures.addError(it); latch.countDown() }
843 if (r.decrementAndGet() == 0) latch.countDown()
Sergey Mashkov88951ac2017-10-27 22:36:08 +0300844 }
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300845
Roman Elizarov1f0df4b2017-12-18 16:29:52 +0300846 reader.invokeOnCompletion(onCancelling = true, handler = handler)
847 writerJob.invokeOnCompletion(onCancelling = true, handler = handler)
848 joinerJob.invokeOnCompletion(onCancelling = true, handler = handler)
Sergey Mashkovfd85fb92017-11-08 22:27:18 +0300849
850 latch.await()
Sergey Mashkov88951ac2017-10-27 22:36:08 +0300851 }
852
Roman Elizarov9fe5f462018-02-21 19:05:52 +0300853 private suspend fun launch(name: String = "child", block: suspend () -> Unit): Job {
Sergey Mashkove7b84bb2018-02-12 19:36:43 +0300854 return launch(context = DefaultDispatcher + CoroutineName(name), parent = coroutineContext[Job]) {
855 block()
856 }.apply {
Vsevolod Tolstopyatov87f2faa2018-04-30 22:53:02 +0300857 invokeOnCompletion( onCancelling = true) { t ->
Sergey Mashkove7b84bb2018-02-12 19:36:43 +0300858 if (t != null) ch.cancel(t)
859 }
860 }
861 }
862
Sergey Mashkov88951ac2017-10-27 22:36:08 +0300863 private fun launch(block: suspend () -> Unit): Job {
864 return launch(DefaultDispatcher) {
865 try {
866 block()
867 } catch (t: Throwable) {
868 failures.addError(t)
869 }
870 }
871 }
872
873 @Test
Sergey Mashkov869bde52018-01-23 16:25:05 +0300874 fun testStressReadWriteFully() = runTest {
Sergey Mashkov32c90b12017-11-07 19:18:16 +0300875 val size = 100
876 val data = ByteArray(size) { it.toByte() }
877 val exec = newFixedThreadPoolContext(8, "testStressReadFully")
878 val buffers = object : DefaultPool<ByteArray>(10) {
879 override fun produceInstance(): ByteArray {
880 return ByteArray(size)
881 }
882 }
883
884 try {
Roman Elizarov6129c942018-01-10 17:34:40 +0300885 (1..100_000 * stressTestMultiplier).map {
Sergey Mashkov32c90b12017-11-07 19:18:16 +0300886 async(exec) {
887 val channel = ByteBufferChannel(autoFlush = false, pool = pool)
888 val job = launch(exec) {
889 try {
890 channel.writeFully(data)
891 } finally {
892 channel.close()
893 }
894 }
895
896 yield()
897 val buffer = buffers.borrow()
898 channel.readFully(buffer)
899 buffers.recycle(buffer)
900 job.cancel()
901 }
902 }.forEach {
903 it.await()
904 }
905 } finally {
906 exec.close()
907 }
908 }
909
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300910 @Test
Sergey Mashkov869bde52018-01-23 16:25:05 +0300911 fun testJoinToSmokeTest() = runTest {
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300912 val other = ByteBufferChannel(autoFlush = false, pool = pool)
Sergey Mashkovd489d3f2017-11-08 07:15:01 +0300913 launch(coroutineContext) {
914 ch.joinTo(other, false)
915 }
916 yield()
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300917
918 ch.writeInt(0x11223344)
919 ch.flush()
920 assertEquals(0x11223344, other.readInt())
Sergey Mashkovd489d3f2017-11-08 07:15:01 +0300921
922 ch.close()
Sergey Mashkovc51ecd32017-10-27 22:42:39 +0300923 }
924
925 @Test
Sergey Mashkov869bde52018-01-23 16:25:05 +0300926 fun testJoinToChainSmokeTest1() = runTest {
Sergey Mashkov4d01f7e2018-01-12 18:46:22 +0300927 val A = ByteBufferChannel(autoFlush = false, pool = pool)
928 val B = ByteBufferChannel(autoFlush = false, pool = pool)
929 val C = ByteBufferChannel(autoFlush = false, pool = pool)
930
931 launch(coroutineContext) {
932 B.joinTo(C, closeOnEnd = true)
933 }
934 launch(coroutineContext) {
935 A.joinTo(B, closeOnEnd = true)
936 }
937
938 yield()
939 A.writeStringUtf8("OK")
940 A.close()
941
942 assertEquals("OK", C.readUTF8Line())
943 }
944
945 @Test
Sergey Mashkov869bde52018-01-23 16:25:05 +0300946 fun testJoinToChainSmokeTest2() = runTest {
Sergey Mashkov4d01f7e2018-01-12 18:46:22 +0300947 val A = ByteBufferChannel(autoFlush = false, pool = pool)
948 val B = ByteBufferChannel(autoFlush = false, pool = pool)
949 val C = ByteBufferChannel(autoFlush = false, pool = pool)
950
951 launch(coroutineContext) {
952 A.joinTo(B, closeOnEnd = true)
953 }
954 launch(coroutineContext) {
955 B.joinTo(C, closeOnEnd = true)
956 }
957
958 yield()
959 A.writeStringUtf8("OK")
960 A.close()
961
962 assertEquals("OK", C.readUTF8Line())
963 }
964
965 @Test
Sergey Mashkov869bde52018-01-23 16:25:05 +0300966 fun testJoinToChainSmokeTest3() = runTest {
Sergey Mashkov4d01f7e2018-01-12 18:46:22 +0300967 val A = ByteBufferChannel(autoFlush = false, pool = pool)
968 val B = ByteBufferChannel(autoFlush = false, pool = pool)
969 val C = ByteBufferChannel(autoFlush = false, pool = pool)
970
971 launch(coroutineContext + CoroutineName("A->B")) {
972 A.joinTo(B, closeOnEnd = true)
973 }
974 launch(coroutineContext + CoroutineName("B->C")) {
975 B.joinTo(C, closeOnEnd = true)
976 }
977
978 A.writeStringUtf8("OK\n")
979// A.close()
980 A.flush()
981 yield()
982 yield()
983 yield()
984 A.close()
985
986 assertEquals("OK", C.readUTF8Line())
987 }
988
989 @Test
Sergey Mashkov869bde52018-01-23 16:25:05 +0300990 fun testJoinToChainSmokeTest4() = runTest {
Sergey Mashkov4d01f7e2018-01-12 18:46:22 +0300991 val A = ByteBufferChannel(autoFlush = false, pool = pool)
992 val B = ByteBufferChannel(autoFlush = false, pool = pool)
993 val C = ByteBufferChannel(autoFlush = false, pool = pool)
994
995 launch(coroutineContext + CoroutineName("A->B")) {
996 A.joinTo(B, closeOnEnd = true)
997 }
998 launch(coroutineContext + CoroutineName("B->C")) {
999 B.joinTo(C, closeOnEnd = true)
1000 }
1001
1002 A.writeStringUtf8("OK\n")
1003 A.close()
1004
1005 assertEquals("OK", C.readUTF8Line())
1006 }
1007
1008 @Test
Sergey Mashkov869bde52018-01-23 16:25:05 +03001009 fun testJoinToFull() = runTest() {
Sergey Mashkov4d01f7e2018-01-12 18:46:22 +03001010 val D = ByteBufferChannel(autoFlush = false, pool = pool)
1011
1012 var written = 0
1013 D.writeByte(1)
1014 written++
1015 while (D.availableForWrite > 0) {
1016 D.writeByte(1)
1017 written++
1018 }
1019
1020 ch.writeInt(777)
1021 ch.close()
1022
1023 launch(coroutineContext) {
1024 ch.joinTo(D, true)
1025 }
1026
1027 yield()
1028
1029 repeat(written) {
1030 D.readByte()
1031 }
1032
1033 assertEquals(777, D.readInt())
1034 }
1035
1036 @Test
Sergey Mashkov869bde52018-01-23 16:25:05 +03001037 fun testJoinToChainNonEmpty() = runTest {
Sergey Mashkov4d01f7e2018-01-12 18:46:22 +03001038 val A = ByteBufferChannel(autoFlush = false, pool = pool)
1039 val B = ByteBufferChannel(autoFlush = false, pool = pool)
1040 val C = ByteBufferChannel(autoFlush = false, pool = pool)
1041
1042 A.writeStringUtf8("1")
1043 A.flush()
1044
1045 launch(coroutineContext + CoroutineName("Reader")) {
1046 assertEquals("1OK", C.readUTF8Line())
1047 }
1048 yield()
1049
1050 launch(coroutineContext + CoroutineName("A->B")) {
1051 A.joinTo(B, closeOnEnd = true)
1052 }
1053
1054 yield()
1055
1056 launch(coroutineContext + CoroutineName("B->C")) {
1057 B.joinTo(C, closeOnEnd = true)
1058 }
1059 yield()
1060
1061
1062 yield()
1063 yield()
1064 yield()
1065
1066 A.writeStringUtf8("OK\n")
1067 A.close()
1068 }
1069
1070 @Test
Sergey Mashkov869bde52018-01-23 16:25:05 +03001071 fun testCopyToThenJoinTo() = runTest {
1072 val A = ByteBufferChannel(autoFlush = false, pool = pool)
1073 val B = ByteBufferChannel(autoFlush = false, pool = pool)
1074 val C = ByteBufferChannel(autoFlush = false, pool = pool)
1075
1076// A.writeStringUtf8("1")
1077// A.flush()
1078
1079 launch(coroutineContext + CoroutineName("A->B")) {
1080 A.copyAndClose(B)
1081 }
1082
1083 launch(coroutineContext + CoroutineName("Reader")) {
1084 assertEquals("1OK", C.readUTF8Line())
1085 }
1086
1087 launch(coroutineContext + CoroutineName("B->C")) {
1088 B.joinTo(C, closeOnEnd = true)
1089 }
1090
1091 A.writeStringUtf8("1OK\n")
1092 A.close()
1093 }
1094
1095 @Test
1096 fun testCopyToThenJoinTo2() = runTest {
1097 val A = ByteBufferChannel(autoFlush = false, pool = pool)
1098 val B = ByteBufferChannel(autoFlush = false, pool = pool)
1099 val C = ByteBufferChannel(autoFlush = false, pool = pool)
1100
1101 A.writeStringUtf8("1")
1102 A.flush()
1103
1104 launch(coroutineContext + CoroutineName("A->B")) {
1105 A.copyAndClose(B)
1106 }
1107
1108 launch(coroutineContext + CoroutineName("Reader")) {
1109 assertEquals("1OK", C.readUTF8Line())
1110 }
1111
1112 launch(coroutineContext + CoroutineName("B->C")) {
1113 B.joinTo(C, closeOnEnd = true)
1114 }
1115
1116 A.writeStringUtf8("OK\n")
1117 A.close()
1118 }
1119
1120 @Test
1121 fun testCopyToThenJoinTo3() = runTest {
1122 val A = ByteBufferChannel(autoFlush = false, pool = pool)
1123 val B = ByteBufferChannel(autoFlush = false, pool = pool)
1124 val C = ByteBufferChannel(autoFlush = false, pool = pool)
1125
1126 A.writeStringUtf8("1")
1127 A.flush()
1128
1129 launch(coroutineContext + CoroutineName("A->B")) {
1130 A.copyAndClose(B)
1131 }
1132
1133 yield()
1134
1135 launch(coroutineContext + CoroutineName("Reader")) {
1136 assertEquals("1OK", C.readUTF8Line())
1137 }
1138
1139 launch(coroutineContext + CoroutineName("B->C")) {
1140 B.joinTo(C, closeOnEnd = true)
1141 }
1142
1143 A.writeStringUtf8("OK\n")
1144 A.close()
1145 }
1146
1147 @Test
1148 fun testCopyToThenJoinTo4() = runTest {
1149 val A = ByteBufferChannel(autoFlush = false, pool = pool)
1150 val B = ByteBufferChannel(autoFlush = false, pool = pool)
1151 val C = ByteBufferChannel(autoFlush = false, pool = pool)
1152
1153 A.writeStringUtf8("1")
1154 A.flush()
1155
1156 launch(coroutineContext + CoroutineName("Reader")) {
1157 assertEquals("1OK", C.readUTF8Line())
1158 }
1159
1160 yield()
1161
1162 launch(coroutineContext + CoroutineName("A->B")) {
1163 A.copyAndClose(B)
1164 }
1165
1166 launch(coroutineContext + CoroutineName("B->C")) {
1167 B.joinTo(C, closeOnEnd = true)
1168 }
1169
1170 A.writeStringUtf8("OK\n")
1171 A.close()
1172 }
1173
1174 @Test
1175 fun testCopyToThenJoinTo5() = runTest {
1176 val A = ByteBufferChannel(autoFlush = false, pool = pool)
1177 val B = ByteBufferChannel(autoFlush = false, pool = pool)
1178 val C = ByteBufferChannel(autoFlush = false, pool = pool)
1179
1180 A.writeStringUtf8("1")
1181 A.flush()
1182
1183 launch(coroutineContext + CoroutineName("B->C")) {
1184 B.joinTo(C, closeOnEnd = true)
1185 }
1186
1187 yield()
1188
1189 launch(coroutineContext + CoroutineName("Reader")) {
1190 assertEquals("1OK", C.readUTF8Line())
1191 }
1192 yield()
1193
1194 launch(coroutineContext + CoroutineName("A->B")) {
1195 A.copyAndClose(B)
1196 }
1197
1198 A.writeStringUtf8("OK\n")
1199 A.close()
1200 }
1201
1202 @Test
1203 fun testCopyToThenJoinTo6() = runTest {
1204 val A = ByteBufferChannel(autoFlush = false, pool = pool)
1205 val B = ByteBufferChannel(autoFlush = false, pool = pool)
1206 val C = ByteBufferChannel(autoFlush = false, pool = pool)
1207
1208 A.writeStringUtf8("1")
1209 A.flush()
1210
1211 launch(coroutineContext + CoroutineName("B->C")) {
1212 B.joinTo(C, closeOnEnd = true)
1213 }
1214
1215 yield()
1216 launch(coroutineContext + CoroutineName("A->B")) {
1217 A.copyAndClose(B)
1218 }
1219
1220 yield()
1221
1222 launch(coroutineContext + CoroutineName("Reader")) {
1223 assertEquals("1OK", C.readUTF8Line())
1224 }
1225
1226 A.writeStringUtf8("OK\n")
1227 A.close()
1228 }
1229
1230 @Test
1231 fun testJoinClosed() = runTest {
Sergey Mashkov4d01f7e2018-01-12 18:46:22 +03001232 ch.writeInt(777)
1233 ch.close()
1234
1235 val bc = ByteBufferChannel(autoFlush = false, pool = pool)
1236 ch.joinTo(bc, closeOnEnd = true)
1237
1238 assertEquals(777, bc.readInt())
1239 assertEquals(0, bc.readRemaining().remaining)
1240 }
1241
1242 @Test
Sergey Mashkov869bde52018-01-23 16:25:05 +03001243 fun testJoinToResumeRead() = runTest {
Sergey Mashkovc9d73f52017-11-09 21:18:33 +03001244 val other = ByteBufferChannel(autoFlush = true, pool = pool)
1245 val result = async(coroutineContext) {
1246 other.readLong()
1247 }
1248 yield()
1249
1250 launch(coroutineContext) {
1251 ch.joinTo(other, true)
1252 }
1253 yield()
1254 yield()
1255
1256 ch.writeLong(0x1122334455667788L)
1257 yield()
1258 assertEquals(0x1122334455667788L, result.await())
1259
1260 ch.close()
1261 }
1262
1263 @Test
Sergey Mashkov869bde52018-01-23 16:25:05 +03001264 fun testJoinToAfterWrite() = runTest {
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001265 val other = ByteBufferChannel(autoFlush = false, pool = pool)
1266
1267 ch.writeInt(0x12345678)
Sergey Mashkovd489d3f2017-11-08 07:15:01 +03001268 launch(coroutineContext) {
1269 ch.joinTo(other, false)
1270 }
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001271
1272 ch.writeInt(0x11223344)
1273 ch.flush()
1274
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001275 yield()
1276
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001277 assertEquals(0x12345678, other.readInt())
1278 assertEquals(0x11223344, other.readInt())
Sergey Mashkovd489d3f2017-11-08 07:15:01 +03001279 ch.close()
Sergey Mashkovc51ecd32017-10-27 22:42:39 +03001280 }
1281
Sergey Mashkov9e9929d2017-11-14 14:52:32 +03001282 @Test
Sergey Mashkov869bde52018-01-23 16:25:05 +03001283 fun testJoinToClosed() = runTest {
Sergey Mashkov9e9929d2017-11-14 14:52:32 +03001284 val other = ByteBufferChannel(autoFlush = false, pool = pool)
1285
1286 ch.writeInt(0x11223344)
1287 ch.close()
1288
1289 ch.joinTo(other, true)
1290 yield()
1291
1292 assertEquals(0x11223344, other.readInt())
1293 assertTrue { other.isClosedForRead }
1294 }
Sergey Mashkov62deb432017-11-09 19:50:26 +03001295
1296 @Test
Sergey Mashkov869bde52018-01-23 16:25:05 +03001297 fun testJoinToDifferentEndian() = runTest {
Sergey Mashkov8af89372017-12-05 12:35:56 +03001298 val other = ByteBufferChannel(autoFlush = true, pool = pool)
1299 other.writeByteOrder = ByteOrder.LITTLE_ENDIAN
1300 ch.writeByteOrder = ByteOrder.BIG_ENDIAN
1301
1302 ch.writeInt(0x11223344) // BE
1303
1304 launch(coroutineContext) {
1305 ch.joinTo(other, true)
1306 }
1307
1308 yield()
1309
1310 ch.writeInt(0x55667788) // BE
1311 ch.writeByteOrder = ByteOrder.LITTLE_ENDIAN
1312 ch.writeInt(0x0abbccdd) // LE
1313 ch.close()
1314
1315 other.readByteOrder = ByteOrder.BIG_ENDIAN
1316 assertEquals(0x11223344, other.readInt()) // BE
1317 assertEquals(0x55667788, other.readInt()) // BE
1318 other.readByteOrder = ByteOrder.LITTLE_ENDIAN
1319 assertEquals(0x0abbccdd, other.readInt()) // LE
1320 }
1321
1322 @Test
Sergey Mashkov869bde52018-01-23 16:25:05 +03001323 fun testReadThenRead() = runTest {
Sergey Mashkov62deb432017-11-09 19:50:26 +03001324 val phase = AtomicInteger(0)
1325
1326 val first = launch(coroutineContext) {
1327 try {
1328 ch.readInt()
1329 fail("EOF expected")
1330 } catch (expected: ClosedReceiveChannelException) {
1331 assertEquals(1, phase.get())
1332 }
1333 }
1334
1335 yield()
1336
1337 val second = launch(coroutineContext) {
1338 try {
1339 ch.readInt()
1340 fail("Should fail with ISE")
1341 } catch (expected: IllegalStateException) {
1342 }
1343 }
1344
1345 yield()
1346 phase.set(1)
1347 ch.close()
1348
1349 yield()
1350
1351 first.invokeOnCompletion { t ->
1352 t?.let { throw it }
1353 }
1354 second.invokeOnCompletion { t ->
1355 t?.let { throw it }
1356 }
1357 }
1358
1359 @Test
Sergey Mashkov869bde52018-01-23 16:25:05 +03001360 fun writeThenReadStress() = runTest {
Sergey Mashkov4d01f7e2018-01-12 18:46:22 +03001361 ch.close()
1362
Roman Elizarov6129c942018-01-10 17:34:40 +03001363 for (i in 1..50_000 * stressTestMultiplier) {
Sergey Mashkov98c87392017-11-20 17:58:28 +03001364 val a = ByteBufferChannel(false, pool)
1365
1366 val w = launch {
1367 a.writeLong(1)
1368 a.close()
1369 }
1370 val r = launch {
1371 a.readLong()
1372 }
1373
1374 w.join()
1375 r.join()
1376 }
Sergey Mashkov98c87392017-11-20 17:58:28 +03001377 }
1378
1379 @Test
Sergey Mashkov869bde52018-01-23 16:25:05 +03001380 fun joinToEmptyStress() = runTest {
Roman Elizarov6129c942018-01-10 17:34:40 +03001381 for (i in 1..50_000 * stressTestMultiplier) {
Sergey Mashkov98c87392017-11-20 17:58:28 +03001382 val a = ByteBufferChannel(false, pool)
1383
1384 launch(coroutineContext) {
1385 a.joinTo(ch, true)
1386 }
1387
1388 yield()
1389
1390 a.close()
1391 }
1392 }
1393
1394 @Test
Sergey Mashkov869bde52018-01-23 16:25:05 +03001395 fun testJoinToStress() = runTest {
Roman Elizarov6129c942018-01-10 17:34:40 +03001396 for (i in 1..10000 * stressTestMultiplier) {
Sergey Mashkov62deb432017-11-09 19:50:26 +03001397 val child = ByteBufferChannel(false, pool)
1398 val writer = launch {
Sergey Mashkov98c87392017-11-20 17:58:28 +03001399 child.writeLong(999 + i.toLong())
Sergey Mashkov62deb432017-11-09 19:50:26 +03001400 child.close()
1401 }
1402
1403 child.joinTo(ch, false)
Sergey Mashkov98c87392017-11-20 17:58:28 +03001404 assertEquals(999 + i.toLong(), ch.readLong())
1405 writer.join()
Sergey Mashkov62deb432017-11-09 19:50:26 +03001406 }
Sergey Mashkov98c87392017-11-20 17:58:28 +03001407
1408 assertEquals(0, ch.availableForRead)
1409 ch.close()
Sergey Mashkov62deb432017-11-09 19:50:26 +03001410 }
1411
Sergey Mashkov2fdb5aa2017-11-08 16:50:13 +03001412 @Test
Sergey Mashkov869bde52018-01-23 16:25:05 +03001413 fun testSequentialJoin() = runTest {
Roman Elizarov6129c942018-01-10 17:34:40 +03001414 val steps = 20_000 * stressTestMultiplier
Sergey Mashkov2fdb5aa2017-11-08 16:50:13 +03001415
1416 val pipeline = launch(coroutineContext) {
1417 for (i in 1..steps) {
1418 val child = ByteBufferChannel(false, pool)
1419 launch(coroutineContext) {
1420 child.writeInt(i)
1421 child.close()
1422 }
1423 child.joinTo(ch, false)
1424 }
1425 }
1426
1427 for (i in 1..steps) {
1428 assertEquals(i, ch.readInt())
1429 }
1430
1431 pipeline.join()
1432 pipeline.invokeOnCompletion { cause ->
1433 cause?.let { throw it }
1434 }
1435 }
1436
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001437 @Test
Sergey Mashkov869bde52018-01-23 16:25:05 +03001438 fun testSequentialJoinYield() = runTest {
Roman Elizarov6129c942018-01-10 17:34:40 +03001439 val steps = 20_000 * stressTestMultiplier
Sergey Mashkovfd85fb92017-11-08 22:27:18 +03001440
1441 val pipeline = launch(coroutineContext) {
1442 for (i in 1..steps) {
1443 val child = ByteBufferChannel(false, pool)
1444 launch(coroutineContext) {
1445 child.writeInt(i)
1446 child.close()
1447 }
1448 yield()
1449 child.joinTo(ch, false)
1450 }
1451 }
1452
1453 for (i in 1..steps) {
1454 assertEquals(i, ch.readInt())
1455 }
1456
1457 pipeline.join()
1458 pipeline.invokeOnCompletion { cause ->
1459 cause?.let { throw it }
1460 }
1461 }
1462
Sergey Mashkovde8e44f2017-12-05 15:41:23 +03001463 @Test
Sergey Mashkov78317162018-03-21 10:12:40 +03001464 fun testJoinToNoFlush() = runTest {
1465 val src = ByteChannel(false)
1466 launch(coroutineContext) {
1467 src.joinTo(ch, closeOnEnd = false, flushOnEnd = false)
1468 assertEquals(0, ch.availableForRead)
1469 ch.flush()
1470 assertEquals(4, ch.availableForRead)
1471 }
1472 yield()
1473
1474 src.writeInt(777)
1475 src.close()
1476 }
1477
1478 @Test
Sergey Mashkov869bde52018-01-23 16:25:05 +03001479 fun testReadBlock() = runTest {
Sergey Mashkovde8e44f2017-12-05 15:41:23 +03001480 var bytesRead = 0L
1481
1482 val r: (ByteBuffer) -> Unit = { bb ->
1483 bytesRead += bb.remaining()
1484 bb.position(bb.limit())
1485 }
1486
1487 val j = launch(coroutineContext) {
1488 while (!ch.isClosedForRead) {
1489 ch.read(0, r)
1490 }
1491 }
1492
1493 yield()
1494
1495 ch.writeStringUtf8("OK\n")
1496 ch.close()
1497
1498 j.join()
1499 j.invokeOnCompletion {
1500 it?.let { throw it }
1501 }
1502 }
1503
1504 @Test
Sergey Mashkov869bde52018-01-23 16:25:05 +03001505 fun testReadBlock2() = runTest {
Sergey Mashkovde8e44f2017-12-05 15:41:23 +03001506 var bytesRead = 0L
1507
1508 val r: (ByteBuffer) -> Unit = { bb ->
1509 bytesRead += bb.remaining()
1510 bb.position(bb.limit())
1511 }
1512
1513 val j = launch(coroutineContext) {
1514 while (!ch.isClosedForRead) {
1515 ch.read(0, r)
1516 }
1517 }
1518
1519 ch.writeStringUtf8("OK\n")
1520 yield()
1521 ch.close()
1522
1523 j.join()
1524 j.invokeOnCompletion {
1525 it?.let { throw it }
1526 }
1527 }
1528
Sergey Mashkova26b7902017-12-05 16:06:03 +03001529 @Test
Sergey Mashkov869bde52018-01-23 16:25:05 +03001530 fun testCancelWriter() = runTest {
Sergey Mashkova26b7902017-12-05 16:06:03 +03001531 val sub = writer(DefaultDispatcher) {
1532 delay(1000000L)
1533 }
1534
1535 sub.channel.cancel()
1536 sub.join()
1537 }
1538
1539 @Test
Sergey Mashkov869bde52018-01-23 16:25:05 +03001540 fun testCancelReader() = runTest {
Sergey Mashkova26b7902017-12-05 16:06:03 +03001541 val sub = reader(DefaultDispatcher) {
1542 delay(10000000L)
1543 }
1544
1545 sub.channel.close(CancellationException())
1546 sub.join()
1547 }
Sergey Mashkovde8e44f2017-12-05 15:41:23 +03001548
Sergey Mashkove15eae82018-06-01 17:23:47 +03001549 @Test
1550 fun testWriteSuspendSessionSmokeTest() = runTest {
1551 ch.writeSuspendSession {
1552 val buffer = request(1)
1553 assertNotNull(buffer)
1554 }
1555
1556 ch.writeSuspendSession {
1557 val buffer = request(1)!!
1558 buffer.put(0x11)
1559 written(1)
1560 }
1561
1562 assertEquals(0, ch.availableForRead)
1563 ch.flush()
1564 assertEquals(1, ch.availableForRead)
1565 assertEquals(0x11, ch.readByte())
1566 }
1567
1568 @Test
1569 fun testWriteSuspendSessionJoined() = runTest {
1570 val next = ByteChannel()
1571 launch(Unconfined) {
1572 ch.joinTo(next, true)
1573 }
1574
1575 yield()
1576
1577 ch.writeSuspendSession {
1578 val buffer = request(1)
1579 assertNotNull(buffer)
1580 buffer!!.put(0x11)
1581 written(1)
1582 }
1583
1584 assertEquals(0, next.availableForRead)
1585 ch.flush()
1586 assertEquals(1, next.availableForRead)
1587 assertEquals(0x11, next.readByte())
1588 }
1589
1590 @Test
1591 fun testWriteSuspendSessionJoinDuringWrite() = runTest {
1592 val next = ByteChannel()
1593
1594 ch.writeSuspendSession {
1595 var buffer = request(1)
1596 assertNotNull(buffer)
1597 buffer!!.put(0x11)
1598 written(1)
1599
1600 launch(Unconfined) {
1601 ch.joinTo(next, true)
1602 }
1603
1604 yield()
1605
1606 assertNull(request(1))
1607 tryAwait(1)
1608 buffer = request(1)
1609 assertNotNull(buffer)
1610 buffer!!.put(0x22)
1611 written(1)
1612 }
1613
1614 ch.flush()
1615
1616 assertEquals(2, next.availableForRead)
1617 assertEquals(0x11, next.readByte())
1618 assertEquals(0x22, next.readByte())
1619 }
1620
Sergey Mashkov9faa0392018-06-13 19:49:45 +03001621 @Test
1622 fun testJoiningDuringWriteFully() = runTest {
1623 val bb = ByteArray(65536)
1624 Random().nextBytes(bb)
1625 val dest = ByteChannel()
1626
1627 launch(coroutineContext) {
1628 expect(1)
1629 ch.writeFully(bb)
1630 ch.flush()
1631 }
1632 yield()
1633 expect(2)
1634
1635 launch {
1636 expect(3)
1637 ch.joinTo(dest, false)
1638 }
1639 yield()
1640
1641 val result = ByteArray(bb.size)
1642 dest.readFully(result)
1643
1644 Assert.assertArrayEquals(bb, result)
1645 finish(4)
1646 }
1647
Sergey Mashkov50a08572017-10-09 12:56:07 +03001648 private inline fun buildPacket(block: ByteWritePacket.() -> Unit): ByteReadPacket {
1649 val builder = BytePacketBuilder(0, pktPool)
1650 try {
1651 block(builder)
1652 return builder.build()
1653 } catch (t: Throwable) {
1654 builder.release()
1655 throw t
1656 }
1657 }
Sergey Mashkov86f70612017-07-25 10:59:44 +03001658}