blob: 49a2abe6d5040bf9a311890e33bd22d59afe596c [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kotlinx.coroutines.experimental.rx2
18
Roman Elizarov86349be2017-03-17 16:47:37 +030019import io.reactivex.Observable
Roman Elizarov331750b2017-02-15 17:59:17 +030020import kotlinx.coroutines.experimental.CommonPool
21import kotlinx.coroutines.experimental.TestBase
22import kotlinx.coroutines.experimental.Unconfined
23import kotlinx.coroutines.experimental.launch
24import org.junit.Assert.assertEquals
25import org.junit.Test
Roman Elizarov331750b2017-02-15 17:59:17 +030026import java.io.IOException
27
28/**
29 * Test emitting multiple values with [rxObservable].
30 */
31class ObservableMultiTest : TestBase() {
32 @Test
33 fun testNumbers() {
34 val n = 100 * stressTestMultiplier
35 val observable = rxObservable(CommonPool) {
36 repeat(n) { send(it) }
37 }
38 checkSingleValue(observable.toList()) { list ->
39 assertEquals((0..n - 1).toList(), list)
40 }
41 }
42
43 @Test
44 fun testConcurrentStress() {
45 val n = 10_000 * stressTestMultiplier
46 val observable = rxObservable<Int>(CommonPool) {
47 // concurrent emitters (many coroutines)
48 val jobs = List(n) {
49 // launch
50 launch(CommonPool) {
51 send(it)
52 }
53 }
54 jobs.forEach { it.join() }
55 }
56 checkSingleValue(observable.toList()) { list ->
57 assertEquals(n, list.size)
58 assertEquals((0..n - 1).toList(), list.sorted())
59 }
60 }
61
62 @Test
63 fun testIteratorResendUnconfined() {
64 val n = 10_000 * stressTestMultiplier
65 val observable = rxObservable(Unconfined) {
Roman Elizarov86349be2017-03-17 16:47:37 +030066 Observable.range(0, n).consumeEach { send(it) }
Roman Elizarov331750b2017-02-15 17:59:17 +030067 }
68 checkSingleValue(observable.toList()) { list ->
69 assertEquals((0..n - 1).toList(), list)
70 }
71 }
72
73 @Test
74 fun testIteratorResendPool() {
75 val n = 10_000 * stressTestMultiplier
76 val observable = rxObservable(CommonPool) {
Roman Elizarov86349be2017-03-17 16:47:37 +030077 Observable.range(0, n).consumeEach { send(it) }
Roman Elizarov331750b2017-02-15 17:59:17 +030078 }
79 checkSingleValue(observable.toList()) { list ->
80 assertEquals((0..n - 1).toList(), list)
81 }
82 }
83
84 @Test
85 fun testSendAndCrash() {
86 val observable = rxObservable(CommonPool) {
87 send("O")
88 throw IOException("K")
89 }
90 val single = rxSingle(CommonPool) {
91 var result = ""
92 try {
Roman Elizarov35af7c82017-03-17 18:08:29 +030093 observable.consumeEach { result += it }
Roman Elizarov331750b2017-02-15 17:59:17 +030094 } catch(e: IOException) {
95 result += e.message
96 }
97 result
98 }
99 checkSingleValue(single) {
100 assertEquals("OK", it)
101 }
102 }
103}