blob: 4db20f19d052db9977456081dff25e67d296b2a6 [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov331750b2017-02-15 17:59:17 +03003 */
4
5package kotlinx.coroutines.experimental.rx2
6
Roman Elizarov86349be2017-03-17 16:47:37 +03007import io.reactivex.Observable
Roman Elizarov331750b2017-02-15 17:59:17 +03008import kotlinx.coroutines.experimental.CommonPool
9import kotlinx.coroutines.experimental.TestBase
10import kotlinx.coroutines.experimental.Unconfined
11import kotlinx.coroutines.experimental.launch
12import org.junit.Assert.assertEquals
13import org.junit.Test
Roman Elizarov331750b2017-02-15 17:59:17 +030014import java.io.IOException
15
16/**
17 * Test emitting multiple values with [rxObservable].
18 */
19class ObservableMultiTest : TestBase() {
20 @Test
21 fun testNumbers() {
22 val n = 100 * stressTestMultiplier
23 val observable = rxObservable(CommonPool) {
24 repeat(n) { send(it) }
25 }
26 checkSingleValue(observable.toList()) { list ->
27 assertEquals((0..n - 1).toList(), list)
28 }
29 }
30
31 @Test
32 fun testConcurrentStress() {
33 val n = 10_000 * stressTestMultiplier
34 val observable = rxObservable<Int>(CommonPool) {
35 // concurrent emitters (many coroutines)
36 val jobs = List(n) {
37 // launch
38 launch(CommonPool) {
39 send(it)
40 }
41 }
42 jobs.forEach { it.join() }
43 }
44 checkSingleValue(observable.toList()) { list ->
45 assertEquals(n, list.size)
46 assertEquals((0..n - 1).toList(), list.sorted())
47 }
48 }
49
50 @Test
51 fun testIteratorResendUnconfined() {
52 val n = 10_000 * stressTestMultiplier
53 val observable = rxObservable(Unconfined) {
Roman Elizarov86349be2017-03-17 16:47:37 +030054 Observable.range(0, n).consumeEach { send(it) }
Roman Elizarov331750b2017-02-15 17:59:17 +030055 }
56 checkSingleValue(observable.toList()) { list ->
57 assertEquals((0..n - 1).toList(), list)
58 }
59 }
60
61 @Test
62 fun testIteratorResendPool() {
63 val n = 10_000 * stressTestMultiplier
64 val observable = rxObservable(CommonPool) {
Roman Elizarov86349be2017-03-17 16:47:37 +030065 Observable.range(0, n).consumeEach { send(it) }
Roman Elizarov331750b2017-02-15 17:59:17 +030066 }
67 checkSingleValue(observable.toList()) { list ->
68 assertEquals((0..n - 1).toList(), list)
69 }
70 }
71
72 @Test
73 fun testSendAndCrash() {
74 val observable = rxObservable(CommonPool) {
75 send("O")
76 throw IOException("K")
77 }
78 val single = rxSingle(CommonPool) {
79 var result = ""
80 try {
Roman Elizarov35af7c82017-03-17 18:08:29 +030081 observable.consumeEach { result += it }
Roman Elizarov331750b2017-02-15 17:59:17 +030082 } catch(e: IOException) {
83 result += e.message
84 }
85 result
86 }
87 checkSingleValue(single) {
88 assertEquals("OK", it)
89 }
90 }
91}