blob: c716a0cc800269abbd8fd4bda498d6cd64e79221 [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kotlinx.coroutines.experimental.rx2
18
19import kotlinx.coroutines.experimental.*
20import kotlinx.coroutines.experimental.channels.produce
21import org.junit.Assert.assertEquals
22import org.junit.Test
23
24class ConvertTest : TestBase() {
25 class TestException(s: String): RuntimeException(s)
26
27 @Test
28 fun testToCompletableSuccess() = runBlocking<Unit> {
29 expect(1)
30 val job = launch(context) {
31 expect(3)
32 }
33 val completable = job.toCompletable(context)
34 completable.subscribe {
35 expect(4)
36 }
37 expect(2)
38 yield()
39 finish(5)
40 }
41
42 @Test
43 fun testToCompletableFail() = runBlocking<Unit> {
44 expect(1)
45 val job = async(context + NonCancellable) { // don't kill parent on exception
46 expect(3)
47 throw RuntimeException("OK")
48 }
49 val completable = job.toCompletable(context)
50 completable.subscribe {
51 expect(4)
52 }
53 expect(2)
54 yield()
55 finish(5)
56 }
57
58 @Test
59 fun testToSingle() {
60 val d = async(CommonPool) {
61 delay(50)
62 "OK"
63 }
64 val single1 = d.toSingle(Unconfined)
65 checkSingleValue(single1) {
66 assertEquals("OK", it)
67 }
68 val single2 = d.toSingle(Unconfined)
69 checkSingleValue(single2) {
70 assertEquals("OK", it)
71 }
72 }
73
74 @Test
75 fun testToSingleFail() {
76 val d = async(CommonPool) {
77 delay(50)
78 throw TestException("OK")
79 }
80 val single1 = d.toSingle(Unconfined)
81 checkErroneous(single1) {
82 check(it is TestException && it.message == "OK") { "$it" }
83 }
84 val single2 = d.toSingle(Unconfined)
85 checkErroneous(single2) {
86 check(it is TestException && it.message == "OK") { "$it" }
87 }
88 }
89
90 @Test
91 fun testToObservable() {
92 val c = produce(CommonPool) {
93 delay(50)
94 send("O")
95 delay(50)
96 send("K")
97 }
98 val observable = c.toObservable(Unconfined)
99 checkSingleValue(observable.reduce { t1, t2 -> t1 + t2 }.toSingle()) {
100 assertEquals("OK", it)
101 }
102 }
103
104 @Test
105 fun testToObservableFail() {
106 val c = produce(CommonPool) {
107 delay(50)
108 send("O")
109 delay(50)
110 throw TestException("K")
111 }
112 val observable = c.toObservable(Unconfined)
113 val single = rxSingle(Unconfined) {
114 var result = ""
115 try {
116 for (x in observable)
117 result += x
118 } catch(e: Throwable) {
119 check(e is TestException)
120 result += e.message
121 }
122 result
123 }
124 checkSingleValue(single) {
125 assertEquals("OK", it)
126 }
127 }
128}