blob: 7e2e1f87cfaa0deedac6f40f44a677270e49ce58 [file] [log] [blame]
Konrad Kamińskid6bb1482017-04-07 09:26:40 +02001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Konrad Kamińskid6bb1482017-04-07 09:26:40 +02003 */
4
5package kotlinx.coroutines.experimental.rx2
6
Roman Elizarov9fe5f462018-02-21 19:05:52 +03007import io.reactivex.*
8import io.reactivex.functions.*
9import io.reactivex.internal.functions.Functions.*
Roman Elizarovffc61ae2017-10-26 19:29:52 +030010import kotlinx.coroutines.experimental.*
Roman Elizarov9fe5f462018-02-21 19:05:52 +030011import org.hamcrest.core.*
12import org.junit.*
13import org.junit.Assert.*
14import java.util.concurrent.*
15import kotlin.coroutines.experimental.*
Konrad Kamińskid6bb1482017-04-07 09:26:40 +020016
17class MaybeTest : TestBase() {
Roman Elizarov45181042017-07-20 20:37:51 +030018 @Before
19 fun setup() {
20 ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
21 }
22
Konrad Kamińskid6bb1482017-04-07 09:26:40 +020023 @Test
24 fun testBasicSuccess() = runBlocking<Unit> {
25 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +030026 val maybe = rxMaybe(coroutineContext) {
Konrad Kamińskid6bb1482017-04-07 09:26:40 +020027 expect(4)
28 "OK"
29 }
30 expect(2)
31 maybe.subscribe { value ->
32 expect(5)
33 Assert.assertThat(value, IsEqual("OK"))
34 }
35 expect(3)
36 yield() // to started coroutine
37 finish(6)
38 }
39
40 @Test
41 fun testBasicEmpty() = runBlocking<Unit> {
42 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +030043 val maybe = rxMaybe(coroutineContext) {
Konrad Kamińskid6bb1482017-04-07 09:26:40 +020044 expect(4)
45 null
46 }
47 expect(2)
48 maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, Action {
49 expect(5)
50 })
51 expect(3)
52 yield() // to started coroutine
53 finish(6)
54 }
55
56 @Test
57 fun testBasicFailure() = runBlocking<Unit> {
58 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +030059 val maybe = rxMaybe(coroutineContext) {
Konrad Kamińskid6bb1482017-04-07 09:26:40 +020060 expect(4)
61 throw RuntimeException("OK")
62 }
63 expect(2)
64 maybe.subscribe({
65 expectUnreached()
66 }, { error ->
67 expect(5)
68 Assert.assertThat(error, IsInstanceOf(RuntimeException::class.java))
69 Assert.assertThat(error.message, IsEqual("OK"))
70 })
71 expect(3)
72 yield() // to started coroutine
73 finish(6)
74 }
75
76
77 @Test
78 fun testBasicUnsubscribe() = runBlocking<Unit> {
79 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +030080 val maybe = rxMaybe(coroutineContext) {
Konrad Kamińskid6bb1482017-04-07 09:26:40 +020081 expect(4)
82 yield() // back to main, will get cancelled
83 expectUnreached()
84 }
85 expect(2)
86 // nothing is called on a disposed rx2 maybe
87 val sub = maybe.subscribe({
88 expectUnreached()
89 }, {
90 expectUnreached()
91 })
92 expect(3)
93 yield() // to started coroutine
94 expect(5)
95 sub.dispose() // will cancel coroutine
96 yield()
97 finish(6)
98 }
99
100 @Test
101 fun testMaybeNoWait() {
102 val maybe = rxMaybe(CommonPool) {
103 "OK"
104 }
105
106 checkMaybeValue(maybe) {
107 assertEquals("OK", it)
108 }
109 }
110
111 @Test
112 fun testMaybeAwait() = runBlocking {
113 assertEquals("OK", Maybe.just("O").await() + "K")
114 }
115
116 @Test
117 fun testMaybeAwaitForNull() = runBlocking {
118 assertNull(Maybe.empty<String>().await())
119 }
120
121 @Test
122 fun testMaybeEmitAndAwait() {
123 val maybe = rxMaybe(CommonPool) {
124 Maybe.just("O").await() + "K"
125 }
126
127 checkMaybeValue(maybe) {
128 assertEquals("OK", it)
129 }
130 }
131
132 @Test
133 fun testMaybeWithDelay() {
134 val maybe = rxMaybe(CommonPool) {
135 Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K"
136 }
137
138 checkMaybeValue(maybe) {
139 assertEquals("OK", it)
140 }
141 }
142
143 @Test
144 fun testMaybeException() {
145 val maybe = rxMaybe(CommonPool) {
146 Observable.just("O", "K").awaitSingle() + "K"
147 }
148
149 checkErroneous(maybe) {
150 assert(it is IllegalArgumentException)
151 }
152 }
153
154 @Test
155 fun testAwaitFirst() {
156 val maybe = rxMaybe(CommonPool) {
157 Observable.just("O", "#").awaitFirst() + "K"
158 }
159
160 checkMaybeValue(maybe) {
161 assertEquals("OK", it)
162 }
163 }
164
165 @Test
166 fun testAwaitLast() {
167 val maybe = rxMaybe(CommonPool) {
168 Observable.just("#", "O").awaitLast() + "K"
169 }
170
171 checkMaybeValue(maybe) {
172 assertEquals("OK", it)
173 }
174 }
175
176 @Test
177 fun testExceptionFromObservable() {
178 val maybe = rxMaybe(CommonPool) {
179 try {
180 Observable.error<String>(RuntimeException("O")).awaitFirst()
181 } catch (e: RuntimeException) {
182 Observable.just(e.message!!).awaitLast() + "K"
183 }
184 }
185
186 checkMaybeValue(maybe) {
187 assertEquals("OK", it)
188 }
189 }
190
191 @Test
192 fun testExceptionFromCoroutine() {
193 val maybe = rxMaybe<String>(CommonPool) {
194 throw IllegalStateException(Observable.just("O").awaitSingle() + "K")
195 }
196
197 checkErroneous(maybe) {
198 assert(it is IllegalStateException)
199 assertEquals("OK", it.message)
200 }
201 }
202}