blob: 4deb5311628812b6ead04bd1ee04015a4aa814df [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov331750b2017-02-15 17:59:17 +03003 */
4
5package kotlinx.coroutines.experimental.rx1
6
7import kotlinx.coroutines.experimental.CommonPool
8import kotlinx.coroutines.experimental.runBlocking
9import org.junit.Assert.assertEquals
10import org.junit.Assert.fail
11import org.junit.Test
12import rx.Observable
13import java.util.concurrent.TimeUnit
14
15/**
16 * Tests emitting single item with [rxObservable].
17 */
18class ObservableSingleTest {
19 @Test
20 fun testSingleNoWait() {
21 val observable = rxObservable(CommonPool) {
22 send("OK")
23 }
24
25 checkSingleValue(observable) {
26 assertEquals("OK", it)
27 }
28 }
29
30 @Test
31 fun testSingleNullNoWait() {
32 val observable = rxObservable<String?>(CommonPool) {
33 send(null)
34 }
35
36 checkSingleValue(observable) {
37 assertEquals(null, it)
38 }
39 }
40
41 @Test
42 fun testSingleAwait() = runBlocking {
43 assertEquals("OK", Observable.just("O").awaitSingle() + "K")
44 }
45
46 @Test
47 fun testSingleEmitAndAwait() {
48 val observable = rxObservable(CommonPool) {
49 send(Observable.just("O").awaitSingle() + "K")
50 }
51
52 checkSingleValue(observable) {
53 assertEquals("OK", it)
54 }
55 }
56
57 @Test
58 fun testSingleWithDelay() {
59 val observable = rxObservable(CommonPool) {
60 send(Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K")
61 }
62
63 checkSingleValue(observable) {
64 assertEquals("OK", it)
65 }
66 }
67
68 @Test
69 fun testSingleException() {
70 val observable = rxObservable(CommonPool) {
71 send(Observable.just("O", "K").awaitSingle() + "K")
72 }
73
74 checkErroneous(observable) {
75 assert(it is IllegalArgumentException)
76 }
77 }
78
79 @Test
80 fun testAwaitFirst() {
81 val observable = rxObservable(CommonPool) {
Roman Elizarov4a67afb2017-03-16 11:10:47 +030082 send(Observable.just("O", "#").awaitFirst() + "K")
83 }
84
85 checkSingleValue(observable) {
86 assertEquals("OK", it)
87 }
88 }
89
90 @Test
91 fun testAwaitFirstOrDefault() {
92 val observable = rxObservable(CommonPool) {
Kirill Rakhmanff4c7c32017-03-15 13:19:56 +010093 send(Observable.empty<String>().awaitFirstOrDefault("O") + "K")
94 }
95
96 checkSingleValue(observable) {
97 assertEquals("OK", it)
98 }
99 }
100
Kirill Rakhmanff4c7c32017-03-15 13:19:56 +0100101 @Test
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300102 fun testAwaitFirstOrDefaultWithValues() {
Kirill Rakhmanff4c7c32017-03-15 13:19:56 +0100103 val observable = rxObservable(CommonPool) {
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300104 send(Observable.just("O", "#").awaitFirstOrDefault("!") + "K")
Roman Elizarov331750b2017-02-15 17:59:17 +0300105 }
106
107 checkSingleValue(observable) {
108 assertEquals("OK", it)
109 }
110 }
111
112 @Test
Konrad KamiƄskib8ed47c2018-02-06 12:17:53 +0100113 fun testAwaitFirstOrNull() {
114 val observable = rxObservable<String>(CommonPool) {
115 send(Observable.empty<String>().awaitFirstOrNull() ?: "OK")
116 }
117
118 checkSingleValue(observable) {
119 assertEquals("OK", it)
120 }
121 }
122
123 @Test
124 fun testAwaitFirstOrNullWithValues() {
125 val observable = rxObservable(CommonPool) {
126 send((Observable.just("O", "#").awaitFirstOrNull() ?: "!") + "K")
127 }
128
129 checkSingleValue(observable) {
130 assertEquals("OK", it)
131 }
132 }
133
134 @Test
135 fun testAwaitFirstOrElse() {
136 val observable = rxObservable(CommonPool) {
137 send(Observable.empty<String>().awaitFirstOrElse { "O" } + "K")
138 }
139
140 checkSingleValue(observable) {
141 assertEquals("OK", it)
142 }
143 }
144
145 @Test
146 fun testAwaitFirstOrElseWithValues() {
147 val observable = rxObservable(CommonPool) {
148 send(Observable.just("O", "#").awaitFirstOrElse { "!" } + "K")
149 }
150
151 checkSingleValue(observable) {
152 assertEquals("OK", it)
153 }
154 }
155
156 @Test
Roman Elizarov331750b2017-02-15 17:59:17 +0300157 fun testAwaitLast() {
158 val observable = rxObservable(CommonPool) {
159 send(Observable.just("#", "O").awaitLast() + "K")
160 }
161
162 checkSingleValue(observable) {
163 assertEquals("OK", it)
164 }
165 }
166
167 @Test
168 fun testExceptionFromObservable() {
169 val observable = rxObservable(CommonPool) {
170 try {
171 send(Observable.error<String>(RuntimeException("O")).awaitFirst())
172 } catch (e: RuntimeException) {
173 send(Observable.just(e.message!!).awaitLast() + "K")
174 }
175 }
176
177 checkSingleValue(observable) {
178 assertEquals("OK", it)
179 }
180 }
181
182 @Test
183 fun testExceptionFromCoroutine() {
184 val observable = rxObservable<String>(CommonPool) {
185 error(Observable.just("O").awaitSingle() + "K")
186 }
187
188 checkErroneous(observable) {
189 assert(it is IllegalStateException)
190 assertEquals("OK", it.message)
191 }
192 }
193
194 @Test
195 fun testObservableIteration() {
196 val observable = rxObservable(CommonPool) {
197 var result = ""
Roman Elizarov86349be2017-03-17 16:47:37 +0300198 Observable.just("O", "K").consumeEach {result += it }
Roman Elizarov331750b2017-02-15 17:59:17 +0300199 send(result)
200 }
201
202 checkSingleValue(observable) {
203 assertEquals("OK", it)
204 }
205 }
206
207 @Test
208 fun testObservableIterationFailure() {
209 val observable = rxObservable(CommonPool) {
210 try {
Roman Elizarov86349be2017-03-17 16:47:37 +0300211 Observable.error<String>(RuntimeException("OK")).consumeEach { fail("Should not be here") }
Roman Elizarov331750b2017-02-15 17:59:17 +0300212 send("Fail")
213 } catch (e: RuntimeException) {
214 send(e.message!!)
215 }
216 }
217
218 checkSingleValue(observable) {
219 assertEquals("OK", it)
220 }
221 }
222}