blob: 1777dc151a9c3f7235f35ddfe463697ea91cfe40 [file] [log] [blame]
Roman Elizarov50e32212017-03-10 17:40:50 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kotlinx.coroutines.experimental.rx2
18
Roman Elizarov9fe5f462018-02-21 19:05:52 +030019import kotlinx.coroutines.experimental.*
20import org.hamcrest.core.*
21import org.junit.*
22import kotlin.coroutines.experimental.*
Roman Elizarov50e32212017-03-10 17:40:50 +030023
24class FlowableTest : TestBase() {
25 @Test
26 fun testBasicSuccess() = runBlocking<Unit> {
27 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +030028 val observable = rxFlowable(coroutineContext) {
Roman Elizarov50e32212017-03-10 17:40:50 +030029 expect(4)
30 send("OK")
31 }
32 expect(2)
33 observable.subscribe { value ->
34 expect(5)
35 Assert.assertThat(value, IsEqual("OK"))
36 }
37 expect(3)
38 yield() // to started coroutine
39 finish(6)
40 }
41
42 @Test
43 fun testBasicFailure() = runBlocking<Unit> {
44 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +030045 val observable = rxFlowable<String>(coroutineContext) {
Roman Elizarov50e32212017-03-10 17:40:50 +030046 expect(4)
47 throw RuntimeException("OK")
48 }
49 expect(2)
50 observable.subscribe({
51 expectUnreached()
52 }, { error ->
53 expect(5)
54 Assert.assertThat(error, IsInstanceOf(RuntimeException::class.java))
55 Assert.assertThat(error.message, IsEqual("OK"))
56 })
57 expect(3)
58 yield() // to started coroutine
59 finish(6)
60 }
61
62 @Test
63 fun testBasicUnsubscribe() = runBlocking<Unit> {
64 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +030065 val observable = rxFlowable<String>(coroutineContext) {
Roman Elizarov50e32212017-03-10 17:40:50 +030066 expect(4)
67 yield() // back to main, will get cancelled
68 expectUnreached()
69 }
70 expect(2)
71 val sub = observable.subscribe({
72 expectUnreached()
73 }, {
74 expectUnreached()
75 })
76 expect(3)
77 yield() // to started coroutine
78 expect(5)
79 sub.dispose() // will cancel coroutine
80 yield()
81 finish(6)
82 }
83}