blob: 0ccc9cf9167b95c053db6a5591e7fe6021bb4b78 [file] [log] [blame]
/*
* Copyright 2016-2017 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kotlinx.coroutines.experimental.rx
import org.junit.Assert.*
import org.junit.Test
import rx.Observable
import java.util.concurrent.TimeUnit
class AsyncRxTest {
@Test
fun testSingle() {
val observable = rxSingle {
Observable.just("O").awaitSingle() + "K"
}
checkObservableWithSingleValue(observable) {
assertEquals("OK", it)
}
}
@Test
fun testSingleWithDelay() {
val observable = rxSingle {
Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K"
}
checkObservableWithSingleValue(observable) {
assertEquals("OK", it)
}
}
@Test
fun testSingleException() {
val observable = rxSingle {
Observable.just("O", "K").awaitSingle() + "K"
}
checkErroneousObservable(observable) {
assert(it is IllegalArgumentException)
}
}
@Test
fun testAwaitFirst() {
val observable = rxSingle {
Observable.just("O", "#").awaitFirst() + "K"
}
checkObservableWithSingleValue(observable) {
assertEquals("OK", it)
}
}
@Test
fun testAwaitLast() {
val observable = rxSingle {
Observable.just("#", "O").awaitLast() + "K"
}
checkObservableWithSingleValue(observable) {
assertEquals("OK", it)
}
}
@Test
fun testExceptionFromObservable() {
val observable = rxSingle {
try {
Observable.error<String>(RuntimeException("O")).awaitFirst()
} catch (e: RuntimeException) {
Observable.just(e.message!!).awaitLast() + "K"
}
}
checkObservableWithSingleValue(observable) {
assertEquals("OK", it)
}
}
@Test
fun testExceptionFromCoroutine() {
val observable = rxSingle {
error(Observable.just("O").awaitSingle() + "K")
}
checkErroneousObservable(observable) {
assert(it is IllegalStateException)
assertEquals("OK", it.message)
}
}
@Test
fun testApplyForEachAndWait() {
val observable = rxSingle {
var result = ""
Observable.just("O", "K").applyForEachAndAwait {
result += it
}
result
}
checkObservableWithSingleValue(observable) {
assertEquals("OK", it)
}
}
@Test
fun testApplyForEachAndWaitException() {
val observable = rxSingle {
try {
Observable.error<String>(RuntimeException("OK")).applyForEachAndAwait {
fail("Should not be here")
}
"Fail"
} catch (e: RuntimeException) {
e.message!!
}
}
checkObservableWithSingleValue(observable) {
assertEquals("OK", it)
}
}
private fun checkErroneousObservable(
observable: Observable<*>,
checker: (Throwable) -> Unit
) {
val singleNotification = observable.materialize().toBlocking().single()
checker(singleNotification.throwable)
}
private fun <T> checkObservableWithSingleValue(
observable: Observable<T>,
checker: (T) -> Unit
) {
val singleValue = observable.toBlocking().single()
checker(singleValue)
}
}