blob: 2de3a2abc9098e582bb168c3d0dd3d5bd2ad5818 [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov331750b2017-02-15 17:59:17 +03003 */
4
5package kotlinx.coroutines.experimental.rx1
6
7import kotlinx.coroutines.experimental.*
Roman Elizarov9fe5f462018-02-21 19:05:52 +03008import kotlinx.coroutines.experimental.channels.*
9import org.hamcrest.core.*
10import org.junit.*
11import org.junit.Assert.*
12import kotlin.coroutines.experimental.*
Roman Elizarov331750b2017-02-15 17:59:17 +030013
14class ConvertTest : TestBase() {
15 class TestException(s: String): RuntimeException(s)
16
17 @Test
18 fun testToCompletableSuccess() = runBlocking<Unit> {
19 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +030020 val job = launch(coroutineContext) {
Roman Elizarov331750b2017-02-15 17:59:17 +030021 expect(3)
22 }
Roman Elizarov43e3af72017-07-21 16:01:31 +030023 val completable = job.asCompletable(coroutineContext)
Roman Elizarov331750b2017-02-15 17:59:17 +030024 completable.subscribe {
25 expect(4)
26 }
27 expect(2)
28 yield()
29 finish(5)
30 }
31
32 @Test
33 fun testToCompletableFail() = runBlocking<Unit> {
34 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +030035 val job = async(coroutineContext + NonCancellable) { // don't kill parent on exception
Roman Elizarov331750b2017-02-15 17:59:17 +030036 expect(3)
37 throw RuntimeException("OK")
38 }
Roman Elizarov43e3af72017-07-21 16:01:31 +030039 val completable = job.asCompletable(coroutineContext)
Roman Elizarov331750b2017-02-15 17:59:17 +030040 completable.subscribe {
41 expect(4)
42 }
43 expect(2)
44 yield()
45 finish(5)
46 }
47
48 @Test
49 fun testToSingle() {
50 val d = async(CommonPool) {
51 delay(50)
52 "OK"
53 }
Roman Elizarov3c3aed72017-03-09 12:31:59 +030054 val single1 = d.asSingle(Unconfined)
Roman Elizarov331750b2017-02-15 17:59:17 +030055 checkSingleValue(single1) {
56 assertThat(it, IsEqual("OK"))
57 }
Roman Elizarov3c3aed72017-03-09 12:31:59 +030058 val single2 = d.asSingle(Unconfined)
Roman Elizarov331750b2017-02-15 17:59:17 +030059 checkSingleValue(single2) {
60 assertThat(it, IsEqual("OK"))
61 }
62 }
63
64 @Test
65 fun testToSingleFail() {
66 val d = async(CommonPool) {
67 delay(50)
68 throw TestException("OK")
69 }
Roman Elizarov3c3aed72017-03-09 12:31:59 +030070 val single1 = d.asSingle(Unconfined)
Roman Elizarov331750b2017-02-15 17:59:17 +030071 checkErroneous(single1) {
72 assertThat(it, IsInstanceOf(TestException::class.java))
73 assertThat(it.message, IsEqual("OK"))
74 }
Roman Elizarov3c3aed72017-03-09 12:31:59 +030075 val single2 = d.asSingle(Unconfined)
Roman Elizarov331750b2017-02-15 17:59:17 +030076 checkErroneous(single2) {
77 assertThat(it, IsInstanceOf(TestException::class.java))
78 assertThat(it.message, IsEqual("OK"))
79 }
80 }
81
82 @Test
83 fun testToObservable() {
84 val c = produce(CommonPool) {
85 delay(50)
86 send("O")
87 delay(50)
88 send("K")
89 }
Roman Elizarov3c3aed72017-03-09 12:31:59 +030090 val observable = c.asObservable(Unconfined)
Roman Elizarov331750b2017-02-15 17:59:17 +030091 checkSingleValue(observable.reduce { t1, t2 -> t1 + t2 }) {
92 assertThat(it, IsEqual("OK"))
93 }
94 }
95
96 @Test
97 fun testToObservableFail() {
98 val c = produce(CommonPool) {
99 delay(50)
100 send("O")
101 delay(50)
102 throw TestException("K")
103 }
Roman Elizarov3c3aed72017-03-09 12:31:59 +0300104 val observable = c.asObservable(Unconfined)
Roman Elizarov331750b2017-02-15 17:59:17 +0300105 val single = rxSingle(Unconfined) {
106 var result = ""
107 try {
Roman Elizarov7adb8762017-03-17 17:54:13 +0300108 observable.consumeEach { result += it }
Roman Elizarov331750b2017-02-15 17:59:17 +0300109 } catch(e: Throwable) {
110 check(e is TestException)
111 result += e.message
112 }
113 result
114 }
115 checkSingleValue(single) {
116 assertThat(it, IsEqual("OK"))
117 }
118 }
119}