blob: e0cfa9887f58a75de1371e6baa73a061388829bd [file] [log] [blame]
Konrad Kamiński3ae898c2017-03-30 17:37:00 +02001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kotlinx.coroutines.experimental.reactor
18
Roman Elizarov9fe5f462018-02-21 19:05:52 +030019import kotlinx.coroutines.experimental.*
20import kotlinx.coroutines.experimental.channels.*
21import kotlinx.coroutines.experimental.reactive.*
22import org.junit.*
23import org.junit.Assert.*
24import kotlin.coroutines.experimental.*
Konrad Kamiński3ae898c2017-03-30 17:37:00 +020025
26class ConvertTest : TestBase() {
27 class TestException(s: String): RuntimeException(s)
28
29 @Test
30 fun testJobToMonoSuccess() = runBlocking<Unit> {
31 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +030032 val job = launch(coroutineContext) {
Konrad Kamiński3ae898c2017-03-30 17:37:00 +020033 expect(3)
34 }
Roman Elizarov43e3af72017-07-21 16:01:31 +030035 val mono = job.asMono(coroutineContext)
Konrad Kamiński3ae898c2017-03-30 17:37:00 +020036 mono.subscribe {
37 expect(4)
38 }
39 expect(2)
40 yield()
41 finish(5)
42 }
43
44 @Test
45 fun testJobToMonoFail() = runBlocking<Unit> {
46 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +030047 val job = async(coroutineContext + NonCancellable) { // don't kill parent on exception
Konrad Kamiński3ae898c2017-03-30 17:37:00 +020048 expect(3)
49 throw RuntimeException("OK")
50 }
Roman Elizarov43e3af72017-07-21 16:01:31 +030051 val mono = job.asMono(coroutineContext)
Konrad Kamiński3ae898c2017-03-30 17:37:00 +020052 mono.subscribe(
53 { fail("no item should be emitted") },
54 { expect(4) }
55 )
56 expect(2)
57 yield()
58 finish(5)
59 }
60
61 @Test
62 fun testDeferredToMono() {
63 val d = async(CommonPool) {
64 delay(50)
65 "OK"
66 }
67 val mono1 = d.asMono(Unconfined)
68 checkMonoValue(mono1) {
69 assertEquals("OK", it)
70 }
71 val mono2 = d.asMono(Unconfined)
72 checkMonoValue(mono2) {
73 assertEquals("OK", it)
74 }
75 }
76
77 @Test
78 fun testDeferredToMonoEmpty() {
79 val d = async(CommonPool) {
80 delay(50)
81 null
82 }
83 val mono1 = d.asMono(Unconfined)
84 checkMonoValue(mono1, ::assertNull)
85 val mono2 = d.asMono(Unconfined)
86 checkMonoValue(mono2, ::assertNull)
87 }
88
89 @Test
90 fun testDeferredToMonoFail() {
91 val d = async(CommonPool) {
92 delay(50)
93 throw TestException("OK")
94 }
95 val mono1 = d.asMono(Unconfined)
96 checkErroneous(mono1) {
97 check(it is TestException && it.message == "OK") { "$it" }
98 }
99 val mono2 = d.asMono(Unconfined)
100 checkErroneous(mono2) {
101 check(it is TestException && it.message == "OK") { "$it" }
102 }
103 }
104
105 @Test
106 fun testToFlux() {
107 val c = produce(CommonPool) {
108 delay(50)
109 send("O")
110 delay(50)
111 send("K")
112 }
113 val flux = c.asFlux(Unconfined)
114 checkMonoValue(flux.reduce { t1, t2 -> t1 + t2 }) {
115 assertEquals("OK", it)
116 }
117 }
118
119 @Test
120 fun testToFluxFail() {
121 val c = produce(CommonPool) {
122 delay(50)
123 send("O")
124 delay(50)
125 throw TestException("K")
126 }
127 val flux = c.asFlux(Unconfined)
128 val mono = mono(Unconfined) {
129 var result = ""
130 try {
131 flux.consumeEach { result += it }
132 } catch(e: Throwable) {
133 check(e is TestException)
134 result += e.message
135 }
136 result
137 }
138 checkMonoValue(mono) {
139 assertEquals("OK", it)
140 }
141 }
142}