blob: c938f897dbaea6f46d69e3595df00610f1fba12e [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov331750b2017-02-15 17:59:17 +03003 */
4
5package kotlinx.coroutines.experimental.rx2
6
7import kotlinx.coroutines.experimental.*
Roman Elizarov9fe5f462018-02-21 19:05:52 +03008import kotlinx.coroutines.experimental.channels.*
9import org.junit.*
10import org.junit.Assert.*
11import kotlin.coroutines.experimental.*
Roman Elizarov331750b2017-02-15 17:59:17 +030012
13class ConvertTest : TestBase() {
14 class TestException(s: String): RuntimeException(s)
15
16 @Test
17 fun testToCompletableSuccess() = runBlocking<Unit> {
18 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +030019 val job = launch(coroutineContext) {
Roman Elizarov331750b2017-02-15 17:59:17 +030020 expect(3)
21 }
Roman Elizarov43e3af72017-07-21 16:01:31 +030022 val completable = job.asCompletable(coroutineContext)
Roman Elizarov331750b2017-02-15 17:59:17 +030023 completable.subscribe {
24 expect(4)
25 }
26 expect(2)
27 yield()
28 finish(5)
29 }
30
31 @Test
32 fun testToCompletableFail() = runBlocking<Unit> {
33 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +030034 val job = async(coroutineContext + NonCancellable) { // don't kill parent on exception
Roman Elizarov331750b2017-02-15 17:59:17 +030035 expect(3)
36 throw RuntimeException("OK")
37 }
Roman Elizarov43e3af72017-07-21 16:01:31 +030038 val completable = job.asCompletable(coroutineContext)
Roman Elizarov331750b2017-02-15 17:59:17 +030039 completable.subscribe {
40 expect(4)
41 }
42 expect(2)
43 yield()
44 finish(5)
45 }
46
47 @Test
Konrad KamiƄskid6bb1482017-04-07 09:26:40 +020048 fun testToMaybe() {
49 val d = async(CommonPool) {
50 delay(50)
51 "OK"
52 }
53 val maybe1 = d.asMaybe(Unconfined)
54 checkMaybeValue(maybe1) {
55 assertEquals("OK", it)
56 }
57 val maybe2 = d.asMaybe(Unconfined)
58 checkMaybeValue(maybe2) {
59 assertEquals("OK", it)
60 }
61 }
62
63 @Test
64 fun testToMaybeEmpty() {
65 val d = async(CommonPool) {
66 delay(50)
67 null
68 }
69 val maybe1 = d.asMaybe(Unconfined)
70 checkMaybeValue(maybe1, ::assertNull)
71 val maybe2 = d.asMaybe(Unconfined)
72 checkMaybeValue(maybe2, ::assertNull)
73 }
74
75 @Test
76 fun testToMaybeFail() {
77 val d = async(CommonPool) {
78 delay(50)
79 throw TestException("OK")
80 }
81 val maybe1 = d.asMaybe(Unconfined)
82 checkErroneous(maybe1) {
83 check(it is TestException && it.message == "OK") { "$it" }
84 }
85 val maybe2 = d.asMaybe(Unconfined)
86 checkErroneous(maybe2) {
87 check(it is TestException && it.message == "OK") { "$it" }
88 }
89 }
90
91 @Test
Roman Elizarov331750b2017-02-15 17:59:17 +030092 fun testToSingle() {
93 val d = async(CommonPool) {
94 delay(50)
95 "OK"
96 }
Roman Elizarov3c3aed72017-03-09 12:31:59 +030097 val single1 = d.asSingle(Unconfined)
Roman Elizarov331750b2017-02-15 17:59:17 +030098 checkSingleValue(single1) {
99 assertEquals("OK", it)
100 }
Roman Elizarov3c3aed72017-03-09 12:31:59 +0300101 val single2 = d.asSingle(Unconfined)
Roman Elizarov331750b2017-02-15 17:59:17 +0300102 checkSingleValue(single2) {
103 assertEquals("OK", it)
104 }
105 }
106
107 @Test
108 fun testToSingleFail() {
109 val d = async(CommonPool) {
110 delay(50)
111 throw TestException("OK")
112 }
Roman Elizarov3c3aed72017-03-09 12:31:59 +0300113 val single1 = d.asSingle(Unconfined)
Roman Elizarov331750b2017-02-15 17:59:17 +0300114 checkErroneous(single1) {
115 check(it is TestException && it.message == "OK") { "$it" }
116 }
Roman Elizarov3c3aed72017-03-09 12:31:59 +0300117 val single2 = d.asSingle(Unconfined)
Roman Elizarov331750b2017-02-15 17:59:17 +0300118 checkErroneous(single2) {
119 check(it is TestException && it.message == "OK") { "$it" }
120 }
121 }
122
123 @Test
124 fun testToObservable() {
125 val c = produce(CommonPool) {
126 delay(50)
127 send("O")
128 delay(50)
129 send("K")
130 }
Roman Elizarov3c3aed72017-03-09 12:31:59 +0300131 val observable = c.asObservable(Unconfined)
Roman Elizarov331750b2017-02-15 17:59:17 +0300132 checkSingleValue(observable.reduce { t1, t2 -> t1 + t2 }.toSingle()) {
133 assertEquals("OK", it)
134 }
135 }
136
137 @Test
138 fun testToObservableFail() {
139 val c = produce(CommonPool) {
140 delay(50)
141 send("O")
142 delay(50)
143 throw TestException("K")
144 }
Roman Elizarov3c3aed72017-03-09 12:31:59 +0300145 val observable = c.asObservable(Unconfined)
Roman Elizarov331750b2017-02-15 17:59:17 +0300146 val single = rxSingle(Unconfined) {
147 var result = ""
148 try {
Roman Elizarov7adb8762017-03-17 17:54:13 +0300149 observable.consumeEach { result += it }
Roman Elizarov331750b2017-02-15 17:59:17 +0300150 } catch(e: Throwable) {
151 check(e is TestException)
152 result += e.message
153 }
154 result
155 }
156 checkSingleValue(single) {
157 assertEquals("OK", it)
158 }
159 }
160}