blob: 1f6b6ac7c2d442cc7ec5ff7ee13f81571ae1aed9 [file] [log] [blame]
Konrad Kamiński3ae898c2017-03-30 17:37:00 +02001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Konrad Kamiński3ae898c2017-03-30 17:37:00 +02003 */
4
5package kotlinx.coroutines.experimental.reactor
6
7import kotlinx.coroutines.experimental.CommonPool
8import kotlinx.coroutines.experimental.TestBase
9import kotlinx.coroutines.experimental.Unconfined
10import kotlinx.coroutines.experimental.launch
11import kotlinx.coroutines.experimental.reactive.consumeEach
12import org.junit.Assert.assertEquals
13import org.junit.Test
14import reactor.core.publisher.Flux
15import java.io.IOException
16
17/**
18 * Test emitting multiple values with [flux].
19 */
20class FluxMultiTest : TestBase() {
21 @Test
22 fun testNumbers() {
23 val n = 100 * stressTestMultiplier
24 val flux = flux(CommonPool) {
25 repeat(n) { send(it) }
26 }
27 checkMonoValue(flux.collectList()) { list ->
28 assertEquals((0..n - 1).toList(), list)
29 }
30 }
31
32 @Test
33 fun testConcurrentStress() {
34 val n = 10_000 * stressTestMultiplier
35 val flux = flux<Int>(CommonPool) {
36 // concurrent emitters (many coroutines)
37 val jobs = List(n) {
38 // launch
39 launch(CommonPool) {
40 send(it)
41 }
42 }
43 jobs.forEach { it.join() }
44 }
45 checkMonoValue(flux.collectList()) { list ->
46 assertEquals(n, list.size)
47 assertEquals((0..n - 1).toList(), list.sorted())
48 }
49 }
50
51 @Test
52 fun testIteratorResendUnconfined() {
53 val n = 10_000 * stressTestMultiplier
54 val flux = flux(Unconfined) {
55 Flux.range(0, n).consumeEach { send(it) }
56 }
57 checkMonoValue(flux.collectList()) { list ->
58 assertEquals((0..n - 1).toList(), list)
59 }
60 }
61
62 @Test
63 fun testIteratorResendPool() {
64 val n = 10_000 * stressTestMultiplier
65 val flux = flux(CommonPool) {
66 Flux.range(0, n).consumeEach { send(it) }
67 }
68 checkMonoValue(flux.collectList()) { list ->
69 assertEquals((0..n - 1).toList(), list)
70 }
71 }
72
73 @Test
74 fun testSendAndCrash() {
75 val flux = flux(CommonPool) {
76 send("O")
77 throw IOException("K")
78 }
79 val mono = mono(CommonPool) {
80 var result = ""
81 try {
82 flux.consumeEach { result += it }
83 } catch(e: IOException) {
84 result += e.message
85 }
86 result
87 }
88 checkMonoValue(mono) {
89 assertEquals("OK", it)
90 }
91 }
92}