blob: 0908b34cf256c3e88912088e2794849ee48fc74b [file] [log] [blame]
Vsevolod Tolstopyatov170690f2019-04-09 12:33:57 +03001/*
2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5package kotlinx.coroutines.rx2
6
7import kotlinx.coroutines.*
8import kotlinx.coroutines.flow.*
Dmitry Khalanskiy387d1dd2020-02-10 18:42:28 +03009import org.junit.Test
10import kotlin.test.*
Vsevolod Tolstopyatov170690f2019-04-09 12:33:57 +030011
12class FlowAsObservableTest : TestBase() {
13 @Test
14 fun testBasicSuccess() = runTest {
15 expect(1)
16 val observable = flow {
17 expect(3)
18 emit("OK")
19 }.asObservable()
20
21 expect(2)
22 observable.subscribe { value ->
23 expect(4)
24 assertEquals("OK", value)
25 }
26
27 finish(5)
28 }
29
30 @Test
31 fun testBasicFailure() = runTest {
32 expect(1)
33 val observable = flow<Int> {
34 expect(3)
35 throw RuntimeException("OK")
36 }.asObservable()
37
38 expect(2)
39 observable.subscribe({ expectUnreached() }, { error ->
40 expect(4)
Dmitry Khalanskiy387d1dd2020-02-10 18:42:28 +030041 assertTrue(error is RuntimeException)
Vsevolod Tolstopyatov170690f2019-04-09 12:33:57 +030042 assertEquals("OK", error.message)
43 })
44 finish(5)
45 }
46
47 @Test
48 fun testBasicUnsubscribe() = runTest {
49 expect(1)
50 val observable = flow<Int> {
51 expect(3)
52 hang {
53 expect(4)
54 }
55 }.asObservable()
56
57 expect(2)
58 val sub = observable.subscribe({ expectUnreached() }, { expectUnreached() })
59 sub.dispose() // will cancel coroutine
60 finish(5)
61 }
62
63 @Test
64 fun testNotifyOnceOnCancellation() = runTest {
65 val observable =
66 flow {
67 expect(3)
68 emit("OK")
69 hang {
70 expect(7)
71 }
72 }.asObservable()
73 .doOnNext {
74 expect(4)
75 assertEquals("OK", it)
76 }
77 .doOnDispose {
78 expect(6) // notified once!
79 }
80
81 expect(1)
82 val job = launch(start = CoroutineStart.UNDISPATCHED) {
83 expect(2)
Vsevolod Tolstopyatov0685dc42019-04-24 12:16:56 +030084 observable.collect {
Vsevolod Tolstopyatov170690f2019-04-09 12:33:57 +030085 expect(5)
86 assertEquals("OK", it)
87 }
88 }
89
90 yield()
91 job.cancelAndJoin()
92 finish(8)
93 }
94
95 @Test
96 fun testFailingConsumer() = runTest {
97 expect(1)
98 val observable = flow {
99 expect(2)
100 emit("OK")
101 hang {
102 expect(4)
103 }
104
105 }.asObservable()
106
107 try {
Vsevolod Tolstopyatov0685dc42019-04-24 12:16:56 +0300108 observable.collect {
Vsevolod Tolstopyatov170690f2019-04-09 12:33:57 +0300109 expect(3)
110 throw TestException()
111 }
112 } catch (e: TestException) {
113 finish(5)
114 }
115 }
116
117 @Test
118 fun testNonAtomicStart() = runTest {
119 withContext(Dispatchers.Unconfined) {
120 val observable = flow<Int> {
121 expect(1)
122 }.asObservable()
123
124 val disposable = observable.subscribe({ expectUnreached() }, { expectUnreached() }, { expectUnreached() })
125 disposable.dispose()
126 }
127 finish(2)
128 }
129
130 @Test
131 fun testFlowCancelledFromWithin() = runTest {
132 val observable = flow {
133 expect(1)
134 emit(1)
135 kotlin.coroutines.coroutineContext.cancel()
136 kotlin.coroutines.coroutineContext.ensureActive()
137 expectUnreached()
138 }.asObservable()
139
140 observable.subscribe({ expect(2) }, { expectUnreached() }, { finish(3) })
141 }
142}