blob: a66d6bffd686186dc823a167b13ebbd9fab5149d [file] [log] [blame]
Konrad Kamiński3ae898c2017-03-30 17:37:00 +02001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kotlinx.coroutines.experimental.reactor
18
Roman Elizarovffc61ae2017-10-26 19:29:52 +030019import kotlinx.coroutines.experimental.*
Roman Elizarov9fe5f462018-02-21 19:05:52 +030020import kotlinx.coroutines.experimental.reactive.*
21import org.hamcrest.core.*
22import org.junit.*
23import org.junit.Assert.*
24import reactor.core.publisher.*
25import java.time.Duration.*
26import kotlin.coroutines.experimental.*
Konrad Kamiński3ae898c2017-03-30 17:37:00 +020027
28/**
29 * Tests emitting single item with [mono].
30 */
31class MonoTest : TestBase() {
Roman Elizarov45181042017-07-20 20:37:51 +030032 @Before
33 fun setup() {
Roman Elizarov265ae442017-08-03 21:35:45 -070034 ignoreLostThreads("timer-", "parallel-")
Roman Elizarov45181042017-07-20 20:37:51 +030035 }
36
Konrad Kamiński3ae898c2017-03-30 17:37:00 +020037 @Test
38 fun testBasicSuccess() = runBlocking {
39 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +030040 val mono = mono(coroutineContext) {
Konrad Kamiński3ae898c2017-03-30 17:37:00 +020041 expect(4)
42 "OK"
43 }
44 expect(2)
45 mono.subscribe { value ->
46 expect(5)
47 Assert.assertThat(value, IsEqual("OK"))
48 }
49 expect(3)
50 yield() // to started coroutine
51 finish(6)
52 }
53
54 @Test
55 fun testBasicFailure() = runBlocking {
56 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +030057 val mono = mono(coroutineContext) {
Konrad Kamiński3ae898c2017-03-30 17:37:00 +020058 expect(4)
59 throw RuntimeException("OK")
60 }
61 expect(2)
62 mono.subscribe({
63 expectUnreached()
64 }, { error ->
65 expect(5)
66 Assert.assertThat(error, IsInstanceOf(RuntimeException::class.java))
67 Assert.assertThat(error.message, IsEqual("OK"))
68 })
69 expect(3)
70 yield() // to started coroutine
71 finish(6)
72 }
73
74 @Test
75 fun testBasicEmpty() = runBlocking {
76 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +030077 val mono = mono(coroutineContext) {
Konrad Kamiński3ae898c2017-03-30 17:37:00 +020078 expect(4)
79 null
80 }
81 expect(2)
Sebastien Deleuzeac8b6da2017-10-06 15:13:15 +020082 mono.subscribe({}, { throw it }, {
Konrad Kamiński3ae898c2017-03-30 17:37:00 +020083 expect(5)
84 })
85 expect(3)
86 yield() // to started coroutine
87 finish(6)
88 }
89
90 @Test
91 fun testBasicUnsubscribe() = runBlocking {
92 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +030093 val mono = mono(coroutineContext) {
Konrad Kamiński3ae898c2017-03-30 17:37:00 +020094 expect(4)
95 yield() // back to main, will get cancelled
96 expectUnreached()
97 }
98 expect(2)
99 // nothing is called on a disposed mono
100 val sub = mono.subscribe({
101 expectUnreached()
102 }, {
103 expectUnreached()
104 })
105 expect(3)
106 yield() // to started coroutine
107 expect(5)
108 sub.dispose() // will cancel coroutine
109 yield()
110 finish(6)
111 }
112
113 @Test
114 fun testMonoNoWait() {
115 val mono = mono(CommonPool) {
116 "OK"
117 }
118
119 checkMonoValue(mono) {
120 assertEquals("OK", it)
121 }
122 }
123
124 @Test
125 fun testMonoAwait() = runBlocking {
126 assertEquals("OK", Mono.just("O").awaitSingle() + "K")
127 }
128
129 @Test
130 fun testMonoEmitAndAwait() {
131 val mono = mono(CommonPool) {
132 Mono.just("O").awaitSingle() + "K"
133 }
134
135 checkMonoValue(mono) {
136 assertEquals("OK", it)
137 }
138 }
139
140 @Test
141 fun testMonoWithDelay() {
142 val mono = mono(CommonPool) {
Sebastien Deleuzeac8b6da2017-10-06 15:13:15 +0200143 Flux.just("O").delayElements(ofMillis(50)).awaitSingle() + "K"
Konrad Kamiński3ae898c2017-03-30 17:37:00 +0200144 }
145
146 checkMonoValue(mono) {
147 assertEquals("OK", it)
148 }
149 }
150
151 @Test
152 fun testMonoException() {
153 val mono = mono(CommonPool) {
154 Flux.just("O", "K").awaitSingle() + "K"
155 }
156
157 checkErroneous(mono) {
158 assert(it is IllegalArgumentException)
159 }
160 }
161
162 @Test
163 fun testAwaitFirst() {
164 val mono = mono(CommonPool) {
165 Flux.just("O", "#").awaitFirst() + "K"
166 }
167
168 checkMonoValue(mono) {
169 assertEquals("OK", it)
170 }
171 }
172
173 @Test
174 fun testAwaitLast() {
175 val mono = mono(CommonPool) {
176 Flux.just("#", "O").awaitLast() + "K"
177 }
178
179 checkMonoValue(mono) {
180 assertEquals("OK", it)
181 }
182 }
183
184 @Test
185 fun testExceptionFromFlux() {
186 val mono = mono(CommonPool) {
187 try {
188 Flux.error<String>(RuntimeException("O")).awaitFirst()
189 } catch (e: RuntimeException) {
190 Flux.just(e.message!!).awaitLast() + "K"
191 }
192 }
193
194 checkMonoValue(mono) {
195 assertEquals("OK", it)
196 }
197 }
198
199 @Test
200 fun testExceptionFromCoroutine() {
201 val mono = mono<String>(CommonPool) {
202 throw IllegalStateException(Flux.just("O").awaitSingle() + "K")
203 }
204
205 checkErroneous(mono) {
206 assert(it is IllegalStateException)
207 assertEquals("OK", it.message)
208 }
209 }
210}