blob: 4db20f19d052db9977456081dff25e67d296b2a6 [file] [log] [blame]
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
package kotlinx.coroutines.experimental.rx2
import io.reactivex.Observable
import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.TestBase
import kotlinx.coroutines.experimental.Unconfined
import kotlinx.coroutines.experimental.launch
import org.junit.Assert.assertEquals
import org.junit.Test
* Test emitting multiple values with [rxObservable].
class ObservableMultiTest : TestBase() {
fun testNumbers() {
val n = 100 * stressTestMultiplier
val observable = rxObservable(CommonPool) {
repeat(n) { send(it) }
checkSingleValue(observable.toList()) { list ->
assertEquals((0..n - 1).toList(), list)
fun testConcurrentStress() {
val n = 10_000 * stressTestMultiplier
val observable = rxObservable<Int>(CommonPool) {
// concurrent emitters (many coroutines)
val jobs = List(n) {
// launch
launch(CommonPool) {
jobs.forEach { it.join() }
checkSingleValue(observable.toList()) { list ->
assertEquals(n, list.size)
assertEquals((0..n - 1).toList(), list.sorted())
fun testIteratorResendUnconfined() {
val n = 10_000 * stressTestMultiplier
val observable = rxObservable(Unconfined) {
Observable.range(0, n).consumeEach { send(it) }
checkSingleValue(observable.toList()) { list ->
assertEquals((0..n - 1).toList(), list)
fun testIteratorResendPool() {
val n = 10_000 * stressTestMultiplier
val observable = rxObservable(CommonPool) {
Observable.range(0, n).consumeEach { send(it) }
checkSingleValue(observable.toList()) { list ->
assertEquals((0..n - 1).toList(), list)
fun testSendAndCrash() {
val observable = rxObservable(CommonPool) {
throw IOException("K")
val single = rxSingle(CommonPool) {
var result = ""
try {
observable.consumeEach { result += it }
} catch(e: IOException) {
result += e.message
checkSingleValue(single) {
assertEquals("OK", it)