blob: cc336ba6b501228f991a927fe7184c7bd253a589 [file] [log] [blame]
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.reactor
import kotlinx.coroutines.*
import kotlinx.coroutines.reactive.*
import org.junit.*
import org.junit.Test
import reactor.core.publisher.*
import java.time.Duration.*
import kotlin.test.*
class FluxSingleTest : TestBase() {
@Before
fun setup() {
ignoreLostThreads("parallel-")
}
@Test
fun testSingleNoWait() {
val flux = flux {
send("OK")
}
checkSingleValue(flux) {
assertEquals("OK", it)
}
}
@Test
fun testSingleAwait() = runBlocking {
assertEquals("OK", Flux.just("O").awaitSingle() + "K")
}
@Test
fun testSingleEmitAndAwait() {
val flux = flux {
send(Flux.just("O").awaitSingle() + "K")
}
checkSingleValue(flux) {
assertEquals("OK", it)
}
}
@Test
fun testSingleWithDelay() {
val flux = flux {
send(Flux.just("O").delayElements(ofMillis(50)).awaitSingle() + "K")
}
checkSingleValue(flux) {
assertEquals("OK", it)
}
}
@Test
fun testSingleException() {
val flux = flux {
send(Flux.just("O", "K").awaitSingle() + "K")
}
checkErroneous(flux) {
assert(it is IllegalArgumentException)
}
}
@Test
fun testAwaitSingleOrDefault() {
val flux = flux {
send(Flux.empty<String>().awaitSingleOrDefault("O") + "K")
}
checkSingleValue(flux) {
assertEquals("OK", it)
}
}
@Test
fun testAwaitSingleOrDefaultException() {
val flux = flux {
send(Flux.just("O", "#").awaitSingleOrDefault("!") + "K")
}
checkErroneous(flux) {
assert(it is IllegalArgumentException)
}
}
@Test
fun testAwaitSingleOrNull() {
val flux = flux<String?> {
send(Flux.empty<String>().awaitSingleOrNull() ?: "OK")
}
checkSingleValue(flux) {
assertEquals("OK", it)
}
}
@Test
fun testAwaitSingleOrNullException() {
val flux = flux {
send((Flux.just("O", "#").awaitSingleOrNull() ?: "!") + "K")
}
checkErroneous(flux) {
assert(it is IllegalArgumentException)
}
}
@Test
fun testAwaitSingleOrElse() {
val flux = flux {
send(Flux.empty<String>().awaitSingleOrElse { "O" } + "K")
}
checkSingleValue(flux) {
assertEquals("OK", it)
}
}
@Test
fun testAwaitSingleOrElseException() {
val flux = flux {
send(Flux.just("O", "#").awaitSingleOrElse { "!" } + "K")
}
checkErroneous(flux) {
assert(it is IllegalArgumentException)
}
}
@Test
fun testAwaitFirst() {
val flux = flux {
send(Flux.just("O", "#").awaitFirst() + "K")
}
checkSingleValue(flux) {
assertEquals("OK", it)
}
}
@Test
fun testAwaitFirstOrDefault() {
val flux = flux {
send(Flux.empty<String>().awaitFirstOrDefault("O") + "K")
}
checkSingleValue(flux) {
assertEquals("OK", it)
}
}
@Test
fun testAwaitFirstOrDefaultWithValues() {
val flux = flux {
send(Flux.just("O", "#").awaitFirstOrDefault("!") + "K")
}
checkSingleValue(flux) {
assertEquals("OK", it)
}
}
@Test
fun testAwaitFirstOrNull() {
val flux = flux<String?> {
send(Flux.empty<String>().awaitFirstOrNull() ?: "OK")
}
checkSingleValue(flux) {
assertEquals("OK", it)
}
}
@Test
fun testAwaitFirstOrNullWithValues() {
val flux = flux {
send((Flux.just("O", "#").awaitFirstOrNull() ?: "!") + "K")
}
checkSingleValue(flux) {
assertEquals("OK", it)
}
}
@Test
fun testAwaitFirstOrElse() {
val flux = flux {
send(Flux.empty<String>().awaitFirstOrElse { "O" } + "K")
}
checkSingleValue(flux) {
assertEquals("OK", it)
}
}
@Test
fun testAwaitFirstOrElseWithValues() {
val flux = flux {
send(Flux.just("O", "#").awaitFirstOrElse { "!" } + "K")
}
checkSingleValue(flux) {
assertEquals("OK", it)
}
}
@Test
fun testAwaitLast() {
val flux = flux {
send(Flux.just("#", "O").awaitLast() + "K")
}
checkSingleValue(flux) {
assertEquals("OK", it)
}
}
@Test
fun testExceptionFromObservable() {
val flux = flux {
try {
send(Flux.error<String>(RuntimeException("O")).awaitFirst())
} catch (e: RuntimeException) {
send(Flux.just(e.message!!).awaitLast() + "K")
}
}
checkSingleValue(flux) {
assertEquals("OK", it)
}
}
@Test
fun testExceptionFromCoroutine() {
val flux = flux<String> {
throw IllegalStateException(Flux.just("O").awaitSingle() + "K")
}
checkErroneous(flux) {
assert(it is IllegalStateException)
assertEquals("OK", it.message)
}
}
@Test
fun testFluxIteration() {
val flux = flux {
var result = ""
Flux.just("O", "K").collect { result += it }
send(result)
}
checkSingleValue(flux) {
assertEquals("OK", it)
}
}
@Test
fun testFluxIterationFailure() {
val flux = flux {
try {
Flux.error<String>(RuntimeException("OK")).collect { fail("Should not be here") }
send("Fail")
} catch (e: RuntimeException) {
send(e.message!!)
}
}
checkSingleValue(flux) {
assertEquals("OK", it)
}
}
}