blob: 5001037b71051a78ec83cbcfa88e1df786b2c7c4 [file] [log] [blame]
Sergey Mashkovee168162017-09-05 19:26:28 +03001package kotlinx.coroutines.experimental.io
2
3import kotlinx.coroutines.experimental.*
Roman Elizarov9fe5f462018-02-21 19:05:52 +03004import kotlinx.coroutines.experimental.io.internal.*
5import org.junit.*
Roman Elizarov75675e62017-11-30 15:14:44 +03006import org.junit.Test
Roman Elizarov9fe5f462018-02-21 19:05:52 +03007import java.io.*
8import kotlin.coroutines.experimental.*
9import kotlin.test.*
Sergey Mashkovee168162017-09-05 19:26:28 +030010
11class ReadUntilDelimiterTest : TestBase() {
12 private val source = ByteChannel(true)
13 private val nonRepeatingDelimiter = "123".toByteArray().asByteBuffer()
14 private val repeatingDelimiter = "AAA".toByteArray().asByteBuffer()
15
16 @Before
17 fun setUp() {
Sergey Mashkov8a30b452017-09-07 13:15:31 +030018 nonRepeatingDelimiter.clear()
19 repeatingDelimiter.clear()
20
Sergey Mashkovee168162017-09-05 19:26:28 +030021// Thread.sleep(5000)
22 }
23
Sergey Mashkov8a30b452017-09-07 13:15:31 +030024 @After
25 fun tearDown() {
26 source.close(CancellationException())
27 }
28
29 @Test
30 fun testReadUntilDelimiterOnClosed() = runBlocking {
31 source.close()
32 assertEquals(-1, source.readUntilDelimiter(nonRepeatingDelimiter, ByteBuffer.allocate(100)))
33 }
34
35 @Test
36 fun testReadUntilDelimiterOnEmptyThenClose() = runBlocking {
37 launch(coroutineContext) {
38 expect(2)
39 source.close()
40 }
41
42 expect(1)
43 assertEquals(-1, source.readUntilDelimiter(nonRepeatingDelimiter, ByteBuffer.allocate(100)))
44 finish(3)
45 }
46
Sergey Mashkovee168162017-09-05 19:26:28 +030047 @Test
48 fun smokeTest() = runBlocking {
49 val tmp = ByteBuffer.allocate(100)
50
51 source.writeInt(777)
52 source.writeFully(nonRepeatingDelimiter.duplicate())
53 source.writeInt(999)
54
55 val rc = source.readUntilDelimiter(nonRepeatingDelimiter, tmp)
56 assertEquals(4, rc)
57 tmp.flip()
58 assertEquals(4, tmp.remaining())
59 assertEquals(777, tmp.getInt())
60 assertEquals(0, nonRepeatingDelimiter.position())
61 assertEquals(3, nonRepeatingDelimiter.limit())
62
63 tmp.clear()
64 tmp.limit(nonRepeatingDelimiter.remaining())
65 source.readFully(tmp)
66
67 source.close()
68
69 tmp.clear()
70 val rc2 = source.readUntilDelimiter(nonRepeatingDelimiter, tmp)
71 assertEquals(4, rc2)
72 tmp.flip()
73 assertEquals(4, tmp.remaining())
74 assertEquals(999, tmp.getInt())
75 tmp.clear()
76
77 val rc3 = source.readUntilDelimiter(nonRepeatingDelimiter, tmp)
Sergey Mashkov8a30b452017-09-07 13:15:31 +030078 assertEquals(-1, rc3)
Sergey Mashkovee168162017-09-05 19:26:28 +030079 }
80
81 @Test
82 fun smokeTestWithRepeatingDelimiter() = runBlocking {
83 val tmp = ByteBuffer.allocate(100)
84
85 source.writeInt(777)
86 source.writeFully(repeatingDelimiter.duplicate())
87 source.writeInt(999)
88
89 val rc = source.readUntilDelimiter(repeatingDelimiter, tmp)
90 assertEquals(4, rc)
91 tmp.flip()
92 assertEquals(4, tmp.remaining())
93 assertEquals(777, tmp.getInt())
94 assertEquals(0, repeatingDelimiter.position())
95 assertEquals(3, repeatingDelimiter.limit())
96
97 tmp.clear()
98 tmp.limit(repeatingDelimiter.remaining())
99 source.readFully(tmp)
100
101 source.close()
102
103 tmp.clear()
104 val rc2 = source.readUntilDelimiter(repeatingDelimiter, tmp)
105 assertEquals(4, rc2)
106 tmp.flip()
107 assertEquals(4, tmp.remaining())
108 assertEquals(999, tmp.getInt())
109 tmp.clear()
110
111 val rc3 = source.readUntilDelimiter(repeatingDelimiter, tmp)
Sergey Mashkov8a30b452017-09-07 13:15:31 +0300112 assertEquals(-1, rc3)
Sergey Mashkovee168162017-09-05 19:26:28 +0300113 }
114
115 @Test
116 fun testEnsureSuspendOrder() = runBlocking {
117 launch(coroutineContext) {
118 expect(2)
119 source.writeInt(777)
120 yield()
121 expect(3)
122 source.writeInt(999)
123 yield()
124 expect(4)
125 source.writeFully(nonRepeatingDelimiter.duplicate())
126 }
127
128 expect(1)
129 val tmp = ByteBuffer.allocate(100)
130 val rc = source.readUntilDelimiter(nonRepeatingDelimiter, tmp)
131 expect(5)
132
133 assertEquals(8, rc)
134 tmp.flip()
135 assertEquals(777, tmp.getInt())
136 assertEquals(999, tmp.getInt())
137 tmp.clear()
138
139 expect(6)
140
141 assertEquals(0, source.readUntilDelimiter(nonRepeatingDelimiter, tmp))
142
Sergey Mashkov8a30b452017-09-07 13:15:31 +0300143 source.skipDelimiter(nonRepeatingDelimiter)
144
145 source.close()
146 assertEquals(-1, source.readUntilDelimiter(nonRepeatingDelimiter, tmp))
147
148
Sergey Mashkovee168162017-09-05 19:26:28 +0300149 finish(7)
150 }
151
152 @Test
153 fun testBulkWrite() = runBlocking {
154 launch(coroutineContext) {
155 expect(2)
156
157 val buffer = ByteBuffer.allocate(100)
158 buffer.putInt(777)
159 buffer.putInt(999)
160 buffer.put(nonRepeatingDelimiter.duplicate())
161 buffer.flip()
162
163 source.writeFully(buffer)
164 }
165
166 expect(1)
167 val tmp = ByteBuffer.allocate(100)
168 val rc = source.readUntilDelimiter(nonRepeatingDelimiter, tmp)
169 expect(3)
170
171 assertEquals(8, rc)
172 tmp.flip()
173 assertEquals(777, tmp.getInt())
174 assertEquals(999, tmp.getInt())
175 tmp.clear()
176
177 expect(4)
178
179 assertEquals(0, source.readUntilDelimiter(nonRepeatingDelimiter, tmp))
180
181 finish(5)
182 }
183
184 @Test
185 fun testPartitionedDelimiter() = runBlocking {
186 launch(coroutineContext) {
187 expect(2)
188
189 val buffer = ByteBuffer.allocate(100)
190 buffer.putInt(777)
191 buffer.putInt(999)
192 buffer.put(nonRepeatingDelimiter.duplicate().apply { limit(1) })
193 buffer.flip()
194
195 source.writeFully(buffer)
196
197 yield()
198 expect(3)
199
200 source.writeFully(nonRepeatingDelimiter.duplicate().apply { position(1) })
Sergey Mashkov8a30b452017-09-07 13:15:31 +0300201 source.close()
Sergey Mashkovee168162017-09-05 19:26:28 +0300202 }
203
204 expect(1)
205 val tmp = ByteBuffer.allocate(100)
206 val rc = source.readUntilDelimiter(nonRepeatingDelimiter, tmp)
207 expect(4)
208
209 assertEquals(8, rc)
210 tmp.flip()
211 assertEquals(777, tmp.getInt())
212 assertEquals(999, tmp.getInt())
213 tmp.clear()
214
215 expect(5)
216
217 assertEquals(0, source.readUntilDelimiter(nonRepeatingDelimiter, tmp))
Sergey Mashkov8a30b452017-09-07 13:15:31 +0300218 source.skipDelimiter(nonRepeatingDelimiter)
219 assertEquals(-1, source.readUntilDelimiter(nonRepeatingDelimiter, tmp))
220
Sergey Mashkovee168162017-09-05 19:26:28 +0300221
222 finish(6)
223 }
Sergey Mashkov8a30b452017-09-07 13:15:31 +0300224
225 @Test
226 fun testReadUntilDelimiterWrapped() = runBlocking {
227 val padSize = BUFFER_SIZE - 8 - 1
228
229 launch(coroutineContext) {
230 expect(2)
231 source.writeFully(ByteBuffer.allocate(padSize - 1))
232 source.writeByte(99)
233 yield()
234
235 expect(4)
236 source.writeFully(nonRepeatingDelimiter.duplicate())
237 expect(5)
238 }
239
240 expect(1)
241 source.readFully(ByteBuffer.allocate(padSize - 1))
242 expect(3)
243
244 val tmp = ByteBuffer.allocate(100)
245 val rc = source.readUntilDelimiter(nonRepeatingDelimiter, tmp)
246 assertEquals(1, rc)
247 assertEquals(99, tmp.get(0).toInt())
248
249 finish(6)
250 }
251
252 @Test
253 fun testReadUntilDelimiterRepeatedWrapped() = runBlocking {
254 val padSize = BUFFER_SIZE - 8 - 1
255
256 launch(coroutineContext) {
257 expect(2)
258 source.writeFully(ByteBuffer.allocate(padSize - 1))
259 source.writeByte(99)
260 yield()
261
262 expect(4)
263 source.writeFully(repeatingDelimiter.duplicate())
264 expect(5)
265 }
266
267 expect(1)
268 source.readFully(ByteBuffer.allocate(padSize - 1))
269 expect(3)
270
271 val tmp = ByteBuffer.allocate(100)
272 val rc = source.readUntilDelimiter(repeatingDelimiter, tmp)
273 assertEquals(1, rc)
274 assertEquals(99, tmp.get(0).toInt())
275
276 finish(6)
277 }
278
279 @Test
280 fun testReadUntilDelimiterPartialFailure() = runBlocking {
281 val padSize = BUFFER_SIZE - 8 - 1
282
283 launch(coroutineContext) {
284 expect(2)
285 source.writeFully(ByteBuffer.allocate(padSize - 1))
286 source.writeByte(99)
287 yield()
288
289 expect(4)
290 source.writeByte(repeatingDelimiter.get(0))
291 source.writeByte(999)
292 expect(5)
293
294 yield()
295 expect(6)
296 source.close()
297 }
298
299 expect(1)
300 source.readFully(ByteBuffer.allocate(padSize - 1))
301 expect(3)
302
303 val tmp = ByteBuffer.allocate(100)
304 val rc = source.readUntilDelimiter(repeatingDelimiter, tmp)
305 expect(7)
306 assertEquals(3, rc)
307 assertEquals(99, tmp.get(0).toInt())
308
309 finish(8)
310 }
311
312 @Test
313 fun testReadUntilDelimiterPartialFailure2() = runBlocking {
314 val padSize = BUFFER_SIZE - 8 - 1
315
316 launch(coroutineContext) {
317 expect(2)
318 source.writeFully(ByteBuffer.allocate(padSize - 1))
319 source.writeByte(99)
320 yield()
321
322 expect(4)
323 source.writeByte(repeatingDelimiter.get(0))
324 source.writeByte(88)
325 source.writeByte(77)
326 expect(5)
327
328 yield()
329 expect(6)
330 source.close()
331 }
332
333 expect(1)
334 source.readFully(ByteBuffer.allocate(padSize - 1))
335 expect(3)
336
337 val tmp = ByteBuffer.allocate(100)
338 val rc = source.readUntilDelimiter(repeatingDelimiter, tmp)
339 expect(7)
340 assertEquals(4, rc)
341 tmp.flip()
342 assertEquals(99, tmp.get().toInt())
343 assertEquals(repeatingDelimiter.get(0), tmp.get())
344 assertEquals(88, tmp.get().toInt())
345 assertEquals(77, tmp.get().toInt())
346
347 finish(8)
348 }
349
350 @Test
351 fun testReadUntilDelimiterWrappedNotEnoughThenFailure() = runBlocking {
352 val padSize = BUFFER_SIZE - 8 - 1
353
354 launch(coroutineContext) {
355 expect(2)
356 source.writeFully(ByteBuffer.allocate(padSize - 1))
357 source.writeByte(99)
358 yield()
359
360 expect(4)
361 assertTrue { repeatingDelimiter.remaining() > 2 }
362 source.writeFully(repeatingDelimiter.duplicate().apply { limit(limit() - 1) })
363 expect(5)
364
365 yield()
366 expect(6)
367 source.close()
368 }
369
370 expect(1)
371 source.readFully(ByteBuffer.allocate(padSize - 1))
372 expect(3)
373
374 val tmp = ByteBuffer.allocate(100)
375 val rc = source.readUntilDelimiter(repeatingDelimiter, tmp)
376 expect(7)
377 assertEquals(3, rc)
378 tmp.flip()
379 assertEquals(99, tmp.get().toInt())
380 for (i in 0 until repeatingDelimiter.remaining() - 1) {
381 assertEquals(repeatingDelimiter.get(i), tmp.get())
382 }
383
384 finish(8)
385 }
386
387 @Test
388 fun testSkipDelimiterSuspend() = runBlocking {
389 launch(coroutineContext) {
390 expect(2)
391 source.writeFully(nonRepeatingDelimiter.duplicate())
392 }
393
394 expect(1)
395 source.skipDelimiter(nonRepeatingDelimiter)
396 finish(3)
397 }
398
399 @Test
400 fun testSkipDelimiterFullyAvailable() = runBlocking {
401 launch(coroutineContext) {
402 expect(2)
403 source.writeFully(nonRepeatingDelimiter.duplicate())
404 expect(3)
405 }
406
407 expect(1)
408 yield()
409 expect(4)
410 source.skipDelimiter(nonRepeatingDelimiter)
411 finish(5)
412 }
413
414 @Test
415 fun testSkipDelimiterSuspendMultiple() = runBlocking {
416 launch(coroutineContext) {
417 expect(2)
418 source.writeFully(nonRepeatingDelimiter.duplicate().apply { limit(1) })
419 yield()
420 expect(3)
421 source.writeFully(nonRepeatingDelimiter.duplicate().apply { position(1) })
422 }
423
424 expect(1)
425 yield()
426 source.skipDelimiter(nonRepeatingDelimiter)
427 finish(4)
428 }
429
430 @Test
431 fun testSkipDelimiterSuspendRingBufferWrap() = runBlocking {
432 launch(coroutineContext) {
433 expect(2)
434 source.writeFully(ByteBuffer.allocate(BUFFER_SIZE - 9))
435 yield()
436
437 expect(4)
438 source.writeFully(nonRepeatingDelimiter.duplicate())
439 yield()
440 }
441
442 expect(1)
443 source.readFully(ByteBuffer.allocate(BUFFER_SIZE - 9))
444 expect(3)
445
446 source.skipDelimiter(nonRepeatingDelimiter)
447 finish(5)
448 }
449
450 @Test
451 fun testSkipDelimiterBroken() = runBlocking {
452 launch(coroutineContext) {
453 expect(2)
454 val bb = ByteBuffer.allocate(nonRepeatingDelimiter.remaining())
455 bb.put(nonRepeatingDelimiter.duplicate())
456 bb.put(1, (bb.get(1) + 1).toByte())
457 bb.clear()
458 source.writeFully(bb)
459 expect(3)
460 }
461
462 expect(1)
463 yield()
464 expect(4)
465
466 try {
467 source.skipDelimiter(nonRepeatingDelimiter)
468 fail()
469 } catch (expected: IOException) {
470 }
471
472 finish(5)
473 }
474
475
Sergey Mashkovee168162017-09-05 19:26:28 +0300476}