blob: 23a63b62c92a05772a591f9d7b0ca25267a58bc0 [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kotlinx.coroutines.experimental.rx1
18
19import kotlinx.coroutines.experimental.*
Roman Elizarov9fe5f462018-02-21 19:05:52 +030020import kotlinx.coroutines.experimental.CancellationException
21import org.hamcrest.core.*
22import org.junit.*
23import org.junit.Assert.*
24import rx.*
25import java.util.concurrent.*
26import kotlin.coroutines.experimental.*
Roman Elizarov331750b2017-02-15 17:59:17 +030027
28/**
29 * Tests emitting single item with [rxSingle].
30 */
31class SingleTest : TestBase() {
Roman Elizarov45181042017-07-20 20:37:51 +030032 @Before
33 fun setup() {
34 ignoreLostThreads("RxComputationScheduler-", "RxIoScheduler-")
35 }
36
Roman Elizarov331750b2017-02-15 17:59:17 +030037 @Test
38 fun testBasicSuccess() = runBlocking<Unit> {
39 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +030040 val single = rxSingle(coroutineContext) {
Roman Elizarov331750b2017-02-15 17:59:17 +030041 expect(4)
42 "OK"
43 }
44 expect(2)
45 single.subscribe { value ->
46 expect(5)
47 assertThat(value, IsEqual("OK"))
48 }
49 expect(3)
50 yield() // to started coroutine
51 finish(6)
52 }
53
54 @Test
55 fun testBasicFailure() = runBlocking<Unit> {
56 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +030057 val single = rxSingle(coroutineContext) {
Roman Elizarov331750b2017-02-15 17:59:17 +030058 expect(4)
59 throw RuntimeException("OK")
60 }
61 expect(2)
62 single.subscribe({
63 expectUnreached()
64 }, { error ->
65 expect(5)
66 assertThat(error, IsInstanceOf(RuntimeException::class.java))
67 assertThat(error.message, IsEqual("OK"))
68 })
69 expect(3)
70 yield() // to started coroutine
71 finish(6)
72 }
73
74
75 @Test
76 fun testBasicUnsubscribe() = runBlocking<Unit> {
77 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +030078 val single = rxSingle(coroutineContext) {
Roman Elizarov331750b2017-02-15 17:59:17 +030079 expect(4)
80 yield() // back to main, will get cancelled
81 expectUnreached()
82 }
83 expect(2)
84 val sub = single.subscribe({
85 expectUnreached()
86 }, { error ->
87 expect(6)
88 assertThat(error, IsInstanceOf(CancellationException::class.java))
89 })
90 expect(3)
91 yield() // to started coroutine
92 expect(5)
93 sub.unsubscribe() // will cancel coroutine
94 yield()
95 finish(7)
96 }
97
98 @Test
99 fun testSingleNoWait() {
100 val single = rxSingle(CommonPool) {
101 "OK"
102 }
103
104 checkSingleValue(single) {
105 assertThat(it, IsEqual("OK"))
106 }
107 }
108
109 @Test
110 fun testSingleNullNoWait() {
111 val single = rxSingle<String?>(CommonPool) {
112 null
113 }
114
115 checkSingleValue(single) {
116 assertThat(it, IsNull())
117 }
118 }
119
120 @Test
121 fun testSingleAwait() = runBlocking {
122 assertEquals("OK", Single.just("O").await() + "K")
123 }
124
125 @Test
126 fun testSingleEmitAndAwait() {
127 val single = rxSingle(CommonPool) {
128 Single.just("O").await() + "K"
129 }
130
131 checkSingleValue(single) {
132 assertThat(it, IsEqual("OK"))
133 }
134 }
135
136 @Test
137 fun testSingleWithDelay() {
138 val single = rxSingle(CommonPool) {
139 Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K"
140 }
141
142 checkSingleValue(single) {
143 assertThat(it, IsEqual("OK"))
144 }
145 }
146
147 @Test
148 fun testSingleException() {
149 val single = rxSingle(CommonPool) {
150 Observable.just("O", "K").awaitSingle() + "K"
151 }
152
153 checkErroneous(single) {
154 assertThat(it, IsInstanceOf(IllegalArgumentException::class.java))
155 }
156 }
157
158 @Test
159 fun testAwaitFirst() {
160 val single = rxSingle(CommonPool) {
161 Observable.just("O", "#").awaitFirst() + "K"
162 }
163
164 checkSingleValue(single) {
165 assertThat(it, IsEqual("OK"))
166 }
167 }
168
169 @Test
170 fun testAwaitLast() {
171 val single = rxSingle(CommonPool) {
172 Observable.just("#", "O").awaitLast() + "K"
173 }
174
175 checkSingleValue(single) {
176 assertThat(it, IsEqual("OK"))
177 }
178 }
179
180 @Test
181 fun testExceptionFromObservable() {
182 val single = rxSingle(CommonPool) {
183 try {
184 Observable.error<String>(RuntimeException("O")).awaitFirst()
185 } catch (e: RuntimeException) {
186 Observable.just(e.message!!).awaitLast() + "K"
187 }
188 }
189
190 checkSingleValue(single) {
191 assertThat(it, IsEqual("OK"))
192 }
193 }
194
195 @Test
196 fun testExceptionFromCoroutine() {
197 val single = rxSingle<String>(CommonPool) {
198 throw RuntimeException(Observable.just("O").awaitSingle() + "K")
199 }
200
201 checkErroneous(single) {
202 assertThat(it, IsInstanceOf(RuntimeException::class.java))
203 assertEquals("OK", it.message)
204 }
205 }
206}