blob: 226d8693ffcc800f2f0c76a76f5c6dd2f6d5d0b2 [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kotlinx.coroutines.experimental.rx2
18
Roman Elizarov9fe5f462018-02-21 19:05:52 +030019import io.reactivex.*
Roman Elizarovffc61ae2017-10-26 19:29:52 +030020import kotlinx.coroutines.experimental.*
Roman Elizarov9fe5f462018-02-21 19:05:52 +030021import org.hamcrest.core.*
22import org.junit.*
23import org.junit.Assert.*
24import java.util.concurrent.*
25import kotlin.coroutines.experimental.*
Roman Elizarov331750b2017-02-15 17:59:17 +030026
27/**
28 * Tests emitting single item with [rxSingle].
29 */
30class SingleTest : TestBase() {
Roman Elizarov45181042017-07-20 20:37:51 +030031 @Before
32 fun setup() {
33 ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
34 }
35
Roman Elizarov331750b2017-02-15 17:59:17 +030036 @Test
37 fun testBasicSuccess() = runBlocking<Unit> {
38 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +030039 val single = rxSingle(coroutineContext) {
Roman Elizarov331750b2017-02-15 17:59:17 +030040 expect(4)
41 "OK"
42 }
43 expect(2)
44 single.subscribe { value ->
45 expect(5)
46 Assert.assertThat(value, IsEqual("OK"))
47 }
48 expect(3)
49 yield() // to started coroutine
50 finish(6)
51 }
52
53 @Test
54 fun testBasicFailure() = runBlocking<Unit> {
55 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +030056 val single = rxSingle(coroutineContext) {
Roman Elizarov331750b2017-02-15 17:59:17 +030057 expect(4)
58 throw RuntimeException("OK")
59 }
60 expect(2)
61 single.subscribe({
62 expectUnreached()
63 }, { error ->
64 expect(5)
65 Assert.assertThat(error, IsInstanceOf(RuntimeException::class.java))
66 Assert.assertThat(error.message, IsEqual("OK"))
67 })
68 expect(3)
69 yield() // to started coroutine
70 finish(6)
71 }
72
73
74 @Test
75 fun testBasicUnsubscribe() = runBlocking<Unit> {
76 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +030077 val single = rxSingle(coroutineContext) {
Roman Elizarov331750b2017-02-15 17:59:17 +030078 expect(4)
79 yield() // back to main, will get cancelled
80 expectUnreached()
81 }
82 expect(2)
83 // nothing is called on a disposed rx2 single
84 val sub = single.subscribe({
85 expectUnreached()
86 }, {
87 expectUnreached()
88 })
89 expect(3)
90 yield() // to started coroutine
91 expect(5)
92 sub.dispose() // will cancel coroutine
93 yield()
94 finish(6)
95 }
96
97 @Test
98 fun testSingleNoWait() {
99 val single = rxSingle(CommonPool) {
100 "OK"
101 }
102
103 checkSingleValue(single) {
104 assertEquals("OK", it)
105 }
106 }
107
108 @Test
109 fun testSingleAwait() = runBlocking {
110 assertEquals("OK", Single.just("O").await() + "K")
111 }
112
113 @Test
114 fun testSingleEmitAndAwait() {
115 val single = rxSingle(CommonPool) {
116 Single.just("O").await() + "K"
117 }
118
119 checkSingleValue(single) {
120 assertEquals("OK", it)
121 }
122 }
123
124 @Test
125 fun testSingleWithDelay() {
126 val single = rxSingle(CommonPool) {
127 Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K"
128 }
129
130 checkSingleValue(single) {
131 assertEquals("OK", it)
132 }
133 }
134
135 @Test
136 fun testSingleException() {
137 val single = rxSingle(CommonPool) {
138 Observable.just("O", "K").awaitSingle() + "K"
139 }
140
141 checkErroneous(single) {
142 assert(it is IllegalArgumentException)
143 }
144 }
145
146 @Test
147 fun testAwaitFirst() {
148 val single = rxSingle(CommonPool) {
149 Observable.just("O", "#").awaitFirst() + "K"
150 }
151
152 checkSingleValue(single) {
153 assertEquals("OK", it)
154 }
155 }
156
157 @Test
158 fun testAwaitLast() {
159 val single = rxSingle(CommonPool) {
160 Observable.just("#", "O").awaitLast() + "K"
161 }
162
163 checkSingleValue(single) {
164 assertEquals("OK", it)
165 }
166 }
167
168 @Test
169 fun testExceptionFromObservable() {
170 val single = rxSingle(CommonPool) {
171 try {
172 Observable.error<String>(RuntimeException("O")).awaitFirst()
173 } catch (e: RuntimeException) {
174 Observable.just(e.message!!).awaitLast() + "K"
175 }
176 }
177
178 checkSingleValue(single) {
179 assertEquals("OK", it)
180 }
181 }
182
183 @Test
184 fun testExceptionFromCoroutine() {
185 val single = rxSingle<String>(CommonPool) {
186 throw IllegalStateException(Observable.just("O").awaitSingle() + "K")
187 }
188
189 checkErroneous(single) {
190 assert(it is IllegalStateException)
191 assertEquals("OK", it.message)
192 }
193 }
194}