| /* |
| * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
| */ |
| |
| package kotlinx.coroutines.reactor |
| |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.reactive.* |
| import org.hamcrest.core.* |
| import org.junit.* |
| import org.junit.Assert.* |
| import org.reactivestreams.* |
| import reactor.core.publisher.* |
| import java.time.Duration.* |
| |
| class MonoTest : TestBase() { |
| @Before |
| fun setup() { |
| ignoreLostThreads("timer-", "parallel-") |
| } |
| |
| @Test |
| fun testBasicSuccess() = runBlocking { |
| expect(1) |
| val mono = mono(currentDispatcher()) { |
| expect(4) |
| "OK" |
| } |
| expect(2) |
| mono.subscribe { value -> |
| expect(5) |
| assertThat(value, IsEqual("OK")) |
| } |
| expect(3) |
| yield() // to started coroutine |
| finish(6) |
| } |
| |
| @Test |
| fun testBasicFailure() = runBlocking { |
| expect(1) |
| val mono = mono(currentDispatcher()) { |
| expect(4) |
| throw RuntimeException("OK") |
| } |
| expect(2) |
| mono.subscribe({ |
| expectUnreached() |
| }, { error -> |
| expect(5) |
| assertThat(error, IsInstanceOf(RuntimeException::class.java)) |
| assertThat(error.message, IsEqual("OK")) |
| }) |
| expect(3) |
| yield() // to started coroutine |
| finish(6) |
| } |
| |
| @Test |
| fun testBasicEmpty() = runBlocking { |
| expect(1) |
| val mono = mono(currentDispatcher()) { |
| expect(4) |
| null |
| } |
| expect(2) |
| mono.subscribe({}, { throw it }, { |
| expect(5) |
| }) |
| expect(3) |
| yield() // to started coroutine |
| finish(6) |
| } |
| |
| @Test |
| fun testBasicUnsubscribe() = runBlocking { |
| expect(1) |
| val mono = mono(currentDispatcher()) { |
| expect(4) |
| yield() // back to main, will get cancelled |
| expectUnreached() |
| } |
| expect(2) |
| // nothing is called on a disposed mono |
| val sub = mono.subscribe({ |
| expectUnreached() |
| }, { |
| expectUnreached() |
| }) |
| expect(3) |
| yield() // to started coroutine |
| expect(5) |
| sub.dispose() // will cancel coroutine |
| yield() |
| finish(6) |
| } |
| |
| @Test |
| fun testMonoNoWait() { |
| val mono = mono { |
| "OK" |
| } |
| |
| checkMonoValue(mono) { |
| assertEquals("OK", it) |
| } |
| } |
| |
| @Test |
| fun testMonoAwait() = runBlocking { |
| assertEquals("OK", Mono.just("O").awaitSingle() + "K") |
| } |
| |
| @Test |
| fun testMonoEmitAndAwait() { |
| val mono = mono { |
| Mono.just("O").awaitSingle() + "K" |
| } |
| |
| checkMonoValue(mono) { |
| assertEquals("OK", it) |
| } |
| } |
| |
| @Test |
| fun testMonoWithDelay() { |
| val mono = mono { |
| Flux.just("O").delayElements(ofMillis(50)).awaitSingle() + "K" |
| } |
| |
| checkMonoValue(mono) { |
| assertEquals("OK", it) |
| } |
| } |
| |
| @Test |
| fun testMonoException() { |
| val mono = mono { |
| Flux.just("O", "K").awaitSingle() + "K" |
| } |
| |
| checkErroneous(mono) { |
| assert(it is IllegalArgumentException) |
| } |
| } |
| |
| @Test |
| fun testAwaitFirst() { |
| val mono = mono { |
| Flux.just("O", "#").awaitFirst() + "K" |
| } |
| |
| checkMonoValue(mono) { |
| assertEquals("OK", it) |
| } |
| } |
| |
| @Test |
| fun testAwaitLast() { |
| val mono = mono { |
| Flux.just("#", "O").awaitLast() + "K" |
| } |
| |
| checkMonoValue(mono) { |
| assertEquals("OK", it) |
| } |
| } |
| |
| @Test |
| fun testExceptionFromFlux() { |
| val mono = mono { |
| try { |
| Flux.error<String>(RuntimeException("O")).awaitFirst() |
| } catch (e: RuntimeException) { |
| Flux.just(e.message!!).awaitLast() + "K" |
| } |
| } |
| |
| checkMonoValue(mono) { |
| assertEquals("OK", it) |
| } |
| } |
| |
| @Test |
| fun testExceptionFromCoroutine() { |
| val mono = mono<String> { |
| throw IllegalStateException(Flux.just("O").awaitSingle() + "K") |
| } |
| |
| checkErroneous(mono) { |
| assert(it is IllegalStateException) |
| assertEquals("OK", it.message) |
| } |
| } |
| |
| @Test |
| fun testSuppressedException() = runTest { |
| val mono = mono(currentDispatcher()) { |
| launch(start = CoroutineStart.ATOMIC) { |
| throw TestException() // child coroutine fails |
| } |
| try { |
| delay(Long.MAX_VALUE) |
| } finally { |
| throw TestException2() // but parent throws another exception while cleaning up |
| } |
| } |
| try { |
| mono.awaitSingle() |
| expectUnreached() |
| } catch (e: TestException) { |
| assertTrue(e.suppressed[0] is TestException2) |
| } |
| } |
| |
| @Test |
| fun testUnhandledException() = runTest { |
| expect(1) |
| var subscription: Subscription? = null |
| val mono = mono(currentDispatcher() + CoroutineExceptionHandler { _, t -> |
| assertTrue(t is TestException) |
| expect(5) |
| |
| }) { |
| expect(4) |
| subscription!!.cancel() // cancel our own subscription, so that delay will get cancelled |
| try { |
| delay(Long.MAX_VALUE) |
| } finally { |
| throw TestException() // would not be able to handle it since mono is disposed |
| } |
| } |
| mono.subscribe(object : Subscriber<Unit> { |
| override fun onSubscribe(s: Subscription) { |
| expect(2) |
| subscription = s |
| } |
| override fun onNext(t: Unit?) { expectUnreached() } |
| override fun onComplete() { expectUnreached() } |
| override fun onError(t: Throwable) { expectUnreached() } |
| }) |
| expect(3) |
| yield() // run coroutine |
| finish(6) |
| } |
| |
| @Test |
| fun testIllegalArgumentException() { |
| assertFailsWith<IllegalArgumentException> { mono(Job()) { } } |
| } |
| } |