blob: 89ba18e0e4931ebd9109a1a5551c233c778e8446 [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kotlinx.coroutines.experimental.rx2
18
Roman Elizarov9fe5f462018-02-21 19:05:52 +030019import io.reactivex.*
Roman Elizarov331750b2017-02-15 17:59:17 +030020import kotlinx.coroutines.experimental.*
Roman Elizarov9fe5f462018-02-21 19:05:52 +030021import org.hamcrest.MatcherAssert.*
22import org.hamcrest.core.*
23import org.junit.*
24import org.junit.runner.*
25import org.junit.runners.*
26import kotlin.coroutines.experimental.*
Roman Elizarov331750b2017-02-15 17:59:17 +030027
28@RunWith(Parameterized::class)
29class IntegrationTest(
30 val ctx: Ctx,
31 val delay: Boolean
32) : TestBase() {
33
34 enum class Ctx {
35 MAIN { override fun invoke(context: CoroutineContext): CoroutineContext = context },
36 COMMON_POOL { override fun invoke(context: CoroutineContext): CoroutineContext = CommonPool },
37 UNCONFINED { override fun invoke(context: CoroutineContext): CoroutineContext = Unconfined };
38
39 abstract operator fun invoke(context: CoroutineContext): CoroutineContext
40 }
41
42 companion object {
43 @Parameterized.Parameters(name = "ctx={0}, delay={1}")
44 @JvmStatic
45 fun params(): Collection<Array<Any>> = Ctx.values().flatMap { ctx ->
46 listOf(false, true).map { delay ->
47 arrayOf<Any>(ctx, delay)
48 }
49 }
50 }
51
52 @Test
Roman Elizarov4a67afb2017-03-16 11:10:47 +030053 fun testEmpty(): Unit = runBlocking {
Roman Elizarov43e3af72017-07-21 16:01:31 +030054 val observable = rxObservable<String>(ctx(coroutineContext)) {
Roman Elizarov4a67afb2017-03-16 11:10:47 +030055 if (delay) delay(1)
56 // does not send anything
57 }
Roman Elizarov86349be2017-03-17 16:47:37 +030058 assertNSE { observable.awaitFirst() }
59 assertThat(observable.awaitFirstOrDefault("OK"), IsEqual("OK"))
Konrad Kamińskib8ed47c2018-02-06 12:17:53 +010060 assertThat(observable.awaitFirstOrNull(), IsNull())
61 assertThat(observable.awaitFirstOrElse { "ELSE" }, IsEqual("ELSE"))
Roman Elizarov86349be2017-03-17 16:47:37 +030062 assertNSE { observable.awaitLast() }
63 assertNSE { observable.awaitSingle() }
Roman Elizarov4a67afb2017-03-16 11:10:47 +030064 var cnt = 0
Roman Elizarov86349be2017-03-17 16:47:37 +030065 observable.consumeEach {
Roman Elizarov4a67afb2017-03-16 11:10:47 +030066 cnt++
67 }
68 assertThat(cnt, IsEqual(0))
69 }
70
71 @Test
Roman Elizarov331750b2017-02-15 17:59:17 +030072 fun testSingle() = runBlocking<Unit> {
Roman Elizarov43e3af72017-07-21 16:01:31 +030073 val observable = rxObservable<String>(ctx(coroutineContext)) {
Roman Elizarov331750b2017-02-15 17:59:17 +030074 if (delay) delay(1)
75 send("OK")
76 }
77 assertThat(observable.awaitFirst(), IsEqual("OK"))
Konrad Kamińskib8ed47c2018-02-06 12:17:53 +010078 assertThat(observable.awaitFirstOrDefault("OK"), IsEqual("OK"))
79 assertThat(observable.awaitFirstOrNull(), IsEqual("OK"))
80 assertThat(observable.awaitFirstOrElse { "ELSE" }, IsEqual("OK"))
Roman Elizarov331750b2017-02-15 17:59:17 +030081 assertThat(observable.awaitLast(), IsEqual("OK"))
82 assertThat(observable.awaitSingle(), IsEqual("OK"))
83 var cnt = 0
Roman Elizarov86349be2017-03-17 16:47:37 +030084 observable.consumeEach {
85 assertThat(it, IsEqual("OK"))
Roman Elizarov331750b2017-02-15 17:59:17 +030086 cnt++
87 }
88 assertThat(cnt, IsEqual(1))
89 }
90
91 @Test
92 fun testNumbers() = runBlocking<Unit> {
93 val n = 100 * stressTestMultiplier
Roman Elizarov43e3af72017-07-21 16:01:31 +030094 val observable = rxObservable<Int>(ctx(coroutineContext)) {
Roman Elizarov331750b2017-02-15 17:59:17 +030095 for (i in 1..n) {
96 send(i)
97 if (delay) delay(1)
98 }
99 }
100 assertThat(observable.awaitFirst(), IsEqual(1))
Konrad Kamińskib8ed47c2018-02-06 12:17:53 +0100101 assertThat(observable.awaitFirstOrDefault(0), IsEqual(1))
102 assertThat(observable.awaitFirstOrNull(), IsEqual(1))
103 assertThat(observable.awaitFirstOrElse { 0 }, IsEqual(1))
Roman Elizarov331750b2017-02-15 17:59:17 +0300104 assertThat(observable.awaitLast(), IsEqual(n))
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300105 assertIAE { observable.awaitSingle() }
Roman Elizarov331750b2017-02-15 17:59:17 +0300106 checkNumbers(n, observable)
Roman Elizarov0f66a6d2017-06-22 14:57:53 +0300107 val channel = observable.openSubscription()
Roman Elizarov43e3af72017-07-21 16:01:31 +0300108 checkNumbers(n, channel.asObservable(ctx(coroutineContext)))
Marko Devcic1d6230a2018-04-04 20:13:08 +0200109 channel.cancel()
Roman Elizarov331750b2017-02-15 17:59:17 +0300110 }
111
112 private suspend fun checkNumbers(n: Int, observable: Observable<Int>) {
113 var last = 0
Roman Elizarov86349be2017-03-17 16:47:37 +0300114 observable.consumeEach {
115 assertThat(it, IsEqual(++last))
Roman Elizarov331750b2017-02-15 17:59:17 +0300116 }
117 assertThat(last, IsEqual(n))
118 }
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300119
120
121 inline fun assertIAE(block: () -> Unit) {
122 try {
123 block()
124 expectUnreached()
125 } catch (e: Throwable) {
126 assertThat(e, IsInstanceOf(IllegalArgumentException::class.java))
127 }
128 }
129
130 inline fun assertNSE(block: () -> Unit) {
131 try {
132 block()
133 expectUnreached()
134 } catch (e: Throwable) {
135 assertThat(e, IsInstanceOf(NoSuchElementException::class.java))
136 }
137 }
Roman Elizarov331750b2017-02-15 17:59:17 +0300138}