blob: 08bbf476f6fc10757234b36cb245f805029889d6 [file] [log] [blame]
Konrad Kamiński3ae898c2017-03-30 17:37:00 +02001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Konrad Kamiński3ae898c2017-03-30 17:37:00 +02003 */
4
5package kotlinx.coroutines.experimental.reactor
6
7import kotlinx.coroutines.experimental.CommonPool
Sebastien Deleuzeac8b6da2017-10-06 15:13:15 +02008import kotlinx.coroutines.experimental.reactive.*
Konrad Kamiński3ae898c2017-03-30 17:37:00 +02009import kotlinx.coroutines.experimental.runBlocking
10import org.junit.Assert.assertEquals
11import org.junit.Assert.fail
12import org.junit.Test
13import reactor.core.publisher.Flux
Sebastien Deleuzeac8b6da2017-10-06 15:13:15 +020014import java.time.Duration.ofMillis
Konrad Kamiński3ae898c2017-03-30 17:37:00 +020015
16/**
17 * Tests emitting single item with [flux].
18 */
19class FluxSingleTest {
20 @Test
21 fun testSingleNoWait() {
22 val flux = flux(CommonPool) {
23 send("OK")
24 }
25
26 checkSingleValue(flux) {
27 assertEquals("OK", it)
28 }
29 }
30
31 @Test
32 fun testSingleAwait() = runBlocking {
33 assertEquals("OK", Flux.just("O").awaitSingle() + "K")
34 }
35
36 @Test
37 fun testSingleEmitAndAwait() {
38 val flux = flux(CommonPool) {
39 send(Flux.just("O").awaitSingle() + "K")
40 }
41
42 checkSingleValue(flux) {
43 assertEquals("OK", it)
44 }
45 }
46
47 @Test
48 fun testSingleWithDelay() {
49 val flux = flux(CommonPool) {
Sebastien Deleuzeac8b6da2017-10-06 15:13:15 +020050 send(Flux.just("O").delayElements(ofMillis(50)).awaitSingle() + "K")
Konrad Kamiński3ae898c2017-03-30 17:37:00 +020051 }
52
53 checkSingleValue(flux) {
54 assertEquals("OK", it)
55 }
56 }
57
58 @Test
59 fun testSingleException() {
60 val flux = flux(CommonPool) {
61 send(Flux.just("O", "K").awaitSingle() + "K")
62 }
63
64 checkErroneous(flux) {
65 assert(it is IllegalArgumentException)
66 }
67 }
68
69 @Test
70 fun testAwaitFirst() {
71 val flux = flux(CommonPool) {
72 send(Flux.just("O", "#").awaitFirst() + "K")
73 }
74
75 checkSingleValue(flux) {
76 assertEquals("OK", it)
77 }
78 }
79
80 @Test
81 fun testAwaitFirstOrDefault() {
82 val flux = flux(CommonPool) {
83 send(Flux.empty<String>().awaitFirstOrDefault("O") + "K")
84 }
85
86 checkSingleValue(flux) {
87 assertEquals("OK", it)
88 }
89 }
90
91 @Test
92 fun testAwaitFirstOrDefaultWithValues() {
93 val flux = flux(CommonPool) {
94 send(Flux.just("O", "#").awaitFirstOrDefault("!") + "K")
95 }
96
97 checkSingleValue(flux) {
98 assertEquals("OK", it)
99 }
100 }
101
102 @Test
Konrad Kamińskib8ed47c2018-02-06 12:17:53 +0100103 fun testAwaitFirstOrNull() {
104 val flux = flux<String>(CommonPool) {
105 send(Flux.empty<String>().awaitFirstOrNull() ?: "OK")
106 }
107
108 checkSingleValue(flux) {
109 assertEquals("OK", it)
110 }
111 }
112
113 @Test
114 fun testAwaitFirstOrNullWithValues() {
115 val flux = flux(CommonPool) {
116 send((Flux.just("O", "#").awaitFirstOrNull() ?: "!") + "K")
117 }
118
119 checkSingleValue(flux) {
120 assertEquals("OK", it)
121 }
122 }
123
124 @Test
125 fun testAwaitFirstOrElse() {
126 val flux = flux(CommonPool) {
127 send(Flux.empty<String>().awaitFirstOrElse { "O" } + "K")
128 }
129
130 checkSingleValue(flux) {
131 assertEquals("OK", it)
132 }
133 }
134
135 @Test
136 fun testAwaitFirstOrElseWithValues() {
137 val flux = flux(CommonPool) {
138 send(Flux.just("O", "#").awaitFirstOrElse { "!" } + "K")
139 }
140
141 checkSingleValue(flux) {
142 assertEquals("OK", it)
143 }
144 }
145
146 @Test
Konrad Kamiński3ae898c2017-03-30 17:37:00 +0200147 fun testAwaitLast() {
148 val flux = flux(CommonPool) {
149 send(Flux.just("#", "O").awaitLast() + "K")
150 }
151
152 checkSingleValue(flux) {
153 assertEquals("OK", it)
154 }
155 }
156
157 @Test
158 fun testExceptionFromObservable() {
159 val flux = flux(CommonPool) {
160 try {
161 send(Flux.error<String>(RuntimeException("O")).awaitFirst())
162 } catch (e: RuntimeException) {
163 send(Flux.just(e.message!!).awaitLast() + "K")
164 }
165 }
166
167 checkSingleValue(flux) {
168 assertEquals("OK", it)
169 }
170 }
171
172 @Test
173 fun testExceptionFromCoroutine() {
174 val flux = flux<String>(CommonPool) {
175 error(Flux.just("O").awaitSingle() + "K")
176 }
177
178 checkErroneous(flux) {
179 assert(it is IllegalStateException)
180 assertEquals("OK", it.message)
181 }
182 }
183
184 @Test
185 fun testFluxIteration() {
186 val flux = flux(CommonPool) {
187 var result = ""
188 Flux.just("O", "K").consumeEach { result += it }
189 send(result)
190 }
191
192 checkSingleValue(flux) {
193 assertEquals("OK", it)
194 }
195 }
196
197 @Test
198 fun testFluxIterationFailure() {
199 val flux = flux(CommonPool) {
200 try {
201 Flux.error<String>(RuntimeException("OK")).consumeEach { fail("Should not be here") }
202 send("Fail")
203 } catch (e: RuntimeException) {
204 send(e.message!!)
205 }
206 }
207
208 checkSingleValue(flux) {
209 assertEquals("OK", it)
210 }
211 }
212}