blob: 168284321dc33920a86d9ccee0e0f0884bf76968 [file] [log] [blame]
package kotlinx.coroutines.experimental.reactor
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
fun <T> checkMonoValue(
mono: Mono<T>,
checker: (T) -> Unit
) {
val monoValue = mono.block()
checker(monoValue)
}
fun checkErroneous(
mono: Mono<*>,
checker: (Throwable) -> Unit
) {
try {
mono.block()
error("Should have failed")
} catch (e: Throwable) {
checker(e)
}
}
fun <T> checkSingleValue(
flux: Flux<T>,
checker: (T) -> Unit
) {
val singleValue = flux.toIterable().single()
checker(singleValue)
}
fun checkErroneous(
flux: Flux<*>,
checker: (Throwable) -> Unit
) {
val singleNotification = flux.materialize().toIterable().single()
checker(singleNotification.throwable)
}