blob: 6be8d8c1e4407e1ce9a8ba617ba548087d17b598 [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov331750b2017-02-15 17:59:17 +03003 */
4
5package kotlinx.coroutines.experimental.rx1
6
7import kotlinx.coroutines.experimental.*
Roman Elizarov9fe5f462018-02-21 19:05:52 +03008import org.hamcrest.MatcherAssert.*
9import org.hamcrest.core.*
10import org.junit.*
11import org.junit.runner.*
12import org.junit.runners.*
13import rx.*
14import kotlin.coroutines.experimental.*
Roman Elizarov331750b2017-02-15 17:59:17 +030015
16@RunWith(Parameterized::class)
17class IntegrationTest(
18 val ctx: Ctx,
19 val delay: Boolean
20) : TestBase() {
21
22 enum class Ctx {
23 MAIN { override fun invoke(context: CoroutineContext): CoroutineContext = context },
24 COMMON_POOL { override fun invoke(context: CoroutineContext): CoroutineContext = CommonPool },
25 UNCONFINED { override fun invoke(context: CoroutineContext): CoroutineContext = Unconfined };
26
27 abstract operator fun invoke(context: CoroutineContext): CoroutineContext
28 }
29
30 companion object {
31 @Parameterized.Parameters(name = "ctx={0}, delay={1}")
32 @JvmStatic
33 fun params(): Collection<Array<Any>> = Ctx.values().flatMap { ctx ->
34 listOf(false, true).map { delay ->
35 arrayOf<Any>(ctx, delay)
36 }
37 }
38 }
39
40 @Test
Roman Elizarov4a67afb2017-03-16 11:10:47 +030041 fun testEmpty(): Unit = runBlocking {
Roman Elizarov43e3af72017-07-21 16:01:31 +030042 val observable = rxObservable<String>(ctx(coroutineContext)) {
Roman Elizarov4a67afb2017-03-16 11:10:47 +030043 if (delay) delay(1)
44 // does not send anything
45 }
Roman Elizarov86349be2017-03-17 16:47:37 +030046 assertNSE { observable.awaitFirst() }
47 assertThat(observable.awaitFirstOrDefault("OK"), IsEqual("OK"))
Konrad Kamińskib8ed47c2018-02-06 12:17:53 +010048 assertThat(observable.awaitFirstOrNull(), IsNull())
49 assertThat(observable.awaitFirstOrElse {"ELSE" }, IsEqual("ELSE"))
Roman Elizarov86349be2017-03-17 16:47:37 +030050 assertNSE { observable.awaitLast() }
51 assertNSE { observable.awaitSingle() }
Roman Elizarov4a67afb2017-03-16 11:10:47 +030052 var cnt = 0
Roman Elizarov86349be2017-03-17 16:47:37 +030053 observable.consumeEach {
Roman Elizarov4a67afb2017-03-16 11:10:47 +030054 cnt++
55 }
56 assertThat(cnt, IsEqual(0))
57 }
58
59 @Test
Roman Elizarov331750b2017-02-15 17:59:17 +030060 fun testSingle() = runBlocking<Unit> {
Roman Elizarov43e3af72017-07-21 16:01:31 +030061 val observable = rxObservable<String>(ctx(coroutineContext)) {
Roman Elizarov331750b2017-02-15 17:59:17 +030062 if (delay) delay(1)
63 send("OK")
64 }
65 assertThat(observable.awaitFirst(), IsEqual("OK"))
Konrad Kamińskib8ed47c2018-02-06 12:17:53 +010066 assertThat(observable.awaitFirstOrDefault("OK"), IsEqual("OK"))
67 assertThat(observable.awaitFirstOrNull(), IsEqual("OK"))
68 assertThat(observable.awaitFirstOrElse {"OK" }, IsEqual("OK"))
Roman Elizarov331750b2017-02-15 17:59:17 +030069 assertThat(observable.awaitLast(), IsEqual("OK"))
70 assertThat(observable.awaitSingle(), IsEqual("OK"))
71 var cnt = 0
Roman Elizarov86349be2017-03-17 16:47:37 +030072 observable.consumeEach {
73 assertThat(it, IsEqual("OK"))
Roman Elizarov331750b2017-02-15 17:59:17 +030074 cnt++
75 }
76 assertThat(cnt, IsEqual(1))
77 }
78
79 @Test
Konrad Kamińskif1a1f072018-02-06 22:53:28 +010080 fun testObservableWithNull() = runBlocking<Unit> {
81 val observable = rxObservable<String?>(ctx(coroutineContext)) {
82 if (delay) delay(1)
83 send(null)
84 }
85 assertThat(observable.awaitFirst(), IsNull())
86 assertThat(observable.awaitFirstOrDefault("OK"), IsNull())
87 assertThat(observable.awaitFirstOrNull(), IsNull())
88 assertThat(observable.awaitFirstOrElse { "OK" }, IsNull())
89 assertThat(observable.awaitLast(), IsNull())
90 assertThat(observable.awaitSingle(), IsNull())
91 var cnt = 0
92 observable.consumeEach {
93 assertThat(it, IsNull())
94 cnt++
95 }
96 assertThat(cnt, IsEqual(1))
97 }
98
99 @Test
Roman Elizarov331750b2017-02-15 17:59:17 +0300100 fun testNumbers() = runBlocking<Unit> {
101 val n = 100 * stressTestMultiplier
Roman Elizarov43e3af72017-07-21 16:01:31 +0300102 val observable = rxObservable<Int>(ctx(coroutineContext)) {
Roman Elizarov331750b2017-02-15 17:59:17 +0300103 for (i in 1..n) {
104 send(i)
105 if (delay) delay(1)
106 }
107 }
108 assertThat(observable.awaitFirst(), IsEqual(1))
Konrad Kamińskib8ed47c2018-02-06 12:17:53 +0100109 assertThat(observable.awaitFirstOrDefault(0), IsEqual(1))
110 assertThat(observable.awaitFirstOrNull(), IsEqual(1))
111 assertThat(observable.awaitFirstOrElse { 0 }, IsEqual(1))
Roman Elizarov331750b2017-02-15 17:59:17 +0300112 assertThat(observable.awaitLast(), IsEqual(n))
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300113 assertIAE { observable.awaitSingle() }
Roman Elizarov331750b2017-02-15 17:59:17 +0300114 checkNumbers(n, observable)
Roman Elizarov0f66a6d2017-06-22 14:57:53 +0300115 val channel = observable.openSubscription()
Roman Elizarov43e3af72017-07-21 16:01:31 +0300116 checkNumbers(n, channel.asObservable(ctx(coroutineContext)))
Marko Devcic1d6230a2018-04-04 20:13:08 +0200117 channel.cancel()
Roman Elizarov331750b2017-02-15 17:59:17 +0300118 }
119
120 private suspend fun checkNumbers(n: Int, observable: Observable<Int>) {
121 var last = 0
Roman Elizarov86349be2017-03-17 16:47:37 +0300122 observable.consumeEach {
123 assertThat(it, IsEqual(++last))
Roman Elizarov331750b2017-02-15 17:59:17 +0300124 }
125 assertThat(last, IsEqual(n))
126 }
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300127
128 inline fun assertIAE(block: () -> Unit) {
129 try {
130 block()
131 expectUnreached()
132 } catch (e: Throwable) {
133 assertThat(e, IsInstanceOf(IllegalArgumentException::class.java))
134 }
135 }
136
137 inline fun assertNSE(block: () -> Unit) {
138 try {
139 block()
140 expectUnreached()
141 } catch (e: Throwable) {
142 assertThat(e, IsInstanceOf(NoSuchElementException::class.java))
143 }
144 }
Roman Elizarov331750b2017-02-15 17:59:17 +0300145}