blob: e022ff1529ae8992277086391756783708e62d87 [file] [log] [blame]
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.reactive
import kotlinx.coroutines.*
import org.hamcrest.core.*
import org.junit.*
import org.junit.Assert.*
import org.reactivestreams.*
class PublishTest : TestBase() {
@Test
fun testBasicEmpty() = runTest {
expect(1)
val publisher = publish<Int> {
expect(5)
}
expect(2)
publisher.subscribe(object : Subscriber<Int> {
override fun onSubscribe(s: Subscription?) { expect(3) }
override fun onNext(t: Int?) { expectUnreached() }
override fun onComplete() { expect(6) }
override fun onError(t: Throwable?) { expectUnreached() }
})
expect(4)
yield() // to publish coroutine
finish(7)
}
@Test
fun testBasicSingle() = runTest {
expect(1)
val publisher = publish {
expect(5)
send(42)
expect(7)
}
expect(2)
publisher.subscribe(object : Subscriber<Int> {
override fun onSubscribe(s: Subscription) {
expect(3)
s.request(1)
}
override fun onNext(t: Int) {
expect(6)
assertThat(t, IsEqual(42))
}
override fun onComplete() { expect(8) }
override fun onError(t: Throwable?) { expectUnreached() }
})
expect(4)
yield() // to publish coroutine
finish(9)
}
@Test
fun testBasicError() = runTest {
expect(1)
val publisher = publish<Int>(NonCancellable) {
expect(5)
throw RuntimeException("OK")
}
expect(2)
publisher.subscribe(object : Subscriber<Int> {
override fun onSubscribe(s: Subscription) {
expect(3)
s.request(1)
}
override fun onNext(t: Int) { expectUnreached() }
override fun onComplete() { expectUnreached() }
override fun onError(t: Throwable) {
expect(6)
assertThat(t, IsInstanceOf(RuntimeException::class.java))
assertThat(t.message, IsEqual("OK"))
}
})
expect(4)
yield() // to publish coroutine
finish(7)
}
@Test
fun testCancelsParentOnFailure() = runTest(
expected = { it is RuntimeException && it.message == "OK" }
) {
// has parent, so should cancel it on failure
publish<Unit> {
throw RuntimeException("OK")
}.openSubscription()
}
@Test
fun testHandleFailureAfterCancel() = runTest(
unhandled = listOf({ it -> it is RuntimeException && it.message == "FAILED" })
){
expect(1)
// Exception should be delivered to CoroutineExceptionHandler, because we create publisher
// with the NonCancellable parent
val publisher = publish<Unit>(NonCancellable + Dispatchers.Unconfined) {
try {
expect(3)
delay(10000)
} finally {
expect(5)
throw RuntimeException("FAILED") // crash after cancel
}
}
var sub: Subscription? = null
publisher.subscribe(object : Subscriber<Unit> {
override fun onComplete() {
expectUnreached()
}
override fun onSubscribe(s: Subscription) {
expect(2)
sub = s
}
override fun onNext(t: Unit?) {
expectUnreached()
}
override fun onError(t: Throwable?) {
expectUnreached()
}
})
expect(4)
sub!!.cancel()
finish(6)
}
@Test
fun testParentHandlesFailure() = runTest {
expect(1)
val deferred = CompletableDeferred<Unit>()
val publisher = publish<Unit>(deferred + Dispatchers.Unconfined) {
try {
expect(3)
delay(10000)
} finally {
expect(5)
throw TestException("FAILED")
}
}
var sub: Subscription? = null
publisher.subscribe(object : Subscriber<Unit> {
override fun onComplete() {
expectUnreached()
}
override fun onSubscribe(s: Subscription) {
expect(2)
sub = s
}
override fun onNext(t: Unit?) {
expectUnreached()
}
override fun onError(t: Throwable?) {
expectUnreached()
}
})
expect(4)
sub!!.cancel()
try {
deferred.await()
expectUnreached()
} catch (e: TestException) {
expect(6)
}
finish(7)
}
@Test
fun testPublishFailureCancelsParent() = runTest(
expected = { it is TestException }
) {
expect(1)
val publisher = publish<Unit> {
expect(5)
throw TestException()
}
expect(2)
publisher.subscribe(object : Subscriber<Unit> {
override fun onComplete() {
expectUnreached()
}
override fun onSubscribe(s: Subscription) {
expect(3)
}
override fun onNext(t: Unit?) {
expectUnreached()
}
override fun onError(t: Throwable?) {
assertTrue(t is TestException)
expect(6)
}
})
expect(4)
try {
yield() // to coroutine, will crash because it is a cancelled parent coroutine
} finally {
finish(7)
}
expectUnreached()
}
@Test
fun testOnNextError() = runTest {
expect(1)
val publisher = publish<String>(NonCancellable) {
expect(4)
try {
send("OK")
} catch(e: Throwable) {
expect(6)
assert(e is TestException)
}
}
expect(2)
val latch = CompletableDeferred<Unit>()
publisher.subscribe(object : Subscriber<String> {
override fun onComplete() {
expectUnreached()
}
override fun onSubscribe(s: Subscription) {
expect(3)
s.request(1)
}
override fun onNext(t: String) {
expect(5)
assertEquals("OK", t)
throw TestException()
}
override fun onError(t: Throwable) {
expect(7)
assert(t is TestException)
latch.complete(Unit)
}
})
latch.await()
finish(8)
}
@Test
fun testFailingConsumer() = runTest {
val pub = publish {
repeat(3) {
expect(it + 1) // expect(1), expect(2) *should* be invoked
send(it)
}
}
try {
pub.collect {
throw TestException()
}
} catch (e: TestException) {
finish(3)
}
}
}