Add support for Rx2 Maybe. (#45)
Add support for Rx2 Maybe
diff --git a/reactive/kotlinx-coroutines-rx2/README.md b/reactive/kotlinx-coroutines-rx2/README.md
index e0a5ed6..11b3381 100644
--- a/reactive/kotlinx-coroutines-rx2/README.md
+++ b/reactive/kotlinx-coroutines-rx2/README.md
@@ -7,6 +7,7 @@
| **Name** | **Result** | **Scope** | **Description**
| --------------- | --------------------------------------- | ---------------- | ---------------
| [rxCompletable] | `Completable` | [CoroutineScope] | Cold completable that starts coroutine on subscribe
+| [rxMaybe] | `Maybe` | [CoroutineScope] | Cold maybe that starts coroutine on subscribe
| [rxSingle] | `Single` | [CoroutineScope] | Cold single that starts coroutine on subscribe
| [rxObservable] | `Observable` | [ProducerScope] | Cold observable that starts coroutine on subscribe
| [rxFlowable] | `Flowable` | [ProducerScope] | Cold observable that starts coroutine on subscribe with **backpressure** support
@@ -16,6 +17,9 @@
| **Name** | **Description**
| -------- | ---------------
| [CompletableSource.await][io.reactivex.CompletableSource.await] | Awaits for completion of the completable value
+| [MaybeSource.await][io.reactivex.MaybeSource.await] | Awaits for the value of the maybe and returns it or null
+| [MaybeSource.awaitOrDefault][io.reactivex.MaybeSource.awaitOrDefault] | Awaits for the value of the maybe and returns it or default
+| [MaybeSource.open][io.reactivex.MaybeSource.open] | Subscribes to maybe and returns [ReceiveChannel]
| [SingleSource.await][io.reactivex.SingleSource.await] | Awaits for completion of the single value and returns it
| [ObservableSource.awaitFirst][io.reactivex.ObservableSource.awaitFirst] | Awaits for the first value from the given observable
| [ObservableSource.awaitFirstOrDefault][io.reactivex.ObservableSource.awaitFirstOrDefault] | Awaits for the first value from the given observable or default
@@ -50,10 +54,14 @@
<!--- DOCS_ROOT reactive/kotlinx-coroutines-rx2/target/dokka/kotlinx-coroutines-rx2 -->
<!--- INDEX kotlinx.coroutines.experimental.rx2 -->
[rxCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/rx-completable.html
+[rxMaybe]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/rx-maybe.html
[rxSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/rx-single.html
[rxObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/rx-observable.html
[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/rx-flowable.html
[io.reactivex.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-completable-source/await.html
+[io.reactivex.MaybeSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-maybe-source/await.html
+[io.reactivex.MaybeSource.awaitOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-maybe-source/await-or-default.html
+[io.reactivex.MaybeSource.open]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-maybe-source/open.html
[io.reactivex.SingleSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-single-source/await.html
[io.reactivex.ObservableSource.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-observable-source/await-first.html
[io.reactivex.ObservableSource.awaitFirstOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-observable-source/await-first-or-default.html
diff --git a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxAwait.kt b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxAwait.kt
index 9f4a30f..50594a3 100644
--- a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxAwait.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxAwait.kt
@@ -40,6 +40,38 @@
})
}
+// ------------------------ MaybeSource ------------------------
+
+/**
+ * Awaits for completion of the maybe without blocking a thread.
+ * Returns the resulting value, null if no value was produced or throws the corresponding exception if this
+ * maybe had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ */
+@Suppress("UNCHECKED_CAST")
+public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).awaitOrDefault(null)
+
+/**
+ * Awaits for completion of the maybe without blocking a thread.
+ * Returns the resulting value, [default] if no value was produced or throws the corresponding exception if this
+ * maybe had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ */
+public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = suspendCancellableCoroutine { cont ->
+ subscribe(object : MaybeObserver<T> {
+ override fun onSubscribe(d: Disposable) { cont.disposeOnCompletion(d) }
+ override fun onComplete() { cont.resume(default) }
+ override fun onSuccess(t: T) { cont.resume(t) }
+ override fun onError(error: Throwable) { cont.resumeWithException(error) }
+ })
+}
+
// ------------------------ SingleSource ------------------------
/**
diff --git a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxChannel.kt b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxChannel.kt
index e58116c..0751453 100644
--- a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxChannel.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxChannel.kt
@@ -16,6 +16,8 @@
package kotlinx.coroutines.experimental.rx2
+import io.reactivex.MaybeObserver
+import io.reactivex.MaybeSource
import io.reactivex.Observable
import io.reactivex.ObservableSource
import io.reactivex.Observer
@@ -25,7 +27,7 @@
import java.io.Closeable
/**
- * Return type for [ObservableSource.open] that can be used to [receive] elements from the
+ * Return type for [ObservableSource.open] and [MaybeSource.open] that can be used to [receive] elements from the
* subscription and to manually [close] it.
*/
public interface SubscriptionReceiveChannel<out T> : ReceiveChannel<T>, Closeable {
@@ -36,6 +38,15 @@
}
/**
+ * Subscribes to this [MaybeSource] and returns a channel to receive elements emitted by it.
+ */
+public fun <T> MaybeSource<T>.open(): SubscriptionReceiveChannel<T> {
+ val channel = SubscriptionChannel<T>()
+ subscribe(channel)
+ return channel
+}
+
+/**
* Subscribes to this [ObservableSource] and returns a channel to receive elements emitted by it.
*/
public fun <T> ObservableSource<T>.open(): SubscriptionReceiveChannel<T> {
@@ -58,6 +69,16 @@
public operator fun <T> ObservableSource<T>.iterator() = open().iterator()
/**
+ * Subscribes to this [MaybeSource] and performs the specified action for each received element.
+ */
+// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
+public suspend fun <T> MaybeSource<T>.consumeEach(action: suspend (T) -> Unit) {
+ open().use { channel ->
+ for (x in channel) action(x)
+ }
+}
+
+/**
* Subscribes to this [ObservableSource] and performs the specified action for each received element.
*/
// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
@@ -67,7 +88,7 @@
}
}
-private class SubscriptionChannel<T> : LinkedListChannel<T>(), SubscriptionReceiveChannel<T>, Observer<T> {
+private class SubscriptionChannel<T> : LinkedListChannel<T>(), SubscriptionReceiveChannel<T>, Observer<T>, MaybeObserver<T> {
@Volatile
var subscription: Disposable? = null
@@ -86,6 +107,10 @@
subscription = sub
}
+ override fun onSuccess(t: T) {
+ offer(t)
+ }
+
override fun onNext(t: T) {
offer(t)
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxConvert.kt
index ffc9404..648cdad 100644
--- a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxConvert.kt
@@ -17,14 +17,12 @@
package kotlinx.coroutines.experimental.rx2
import io.reactivex.Completable
+import io.reactivex.Maybe
import kotlinx.coroutines.experimental.Deferred
import kotlinx.coroutines.experimental.Job
import kotlinx.coroutines.experimental.channels.ReceiveChannel
-import kotlinx.coroutines.experimental.launch
import io.reactivex.Observable
import io.reactivex.Single
-import io.reactivex.functions.Cancellable
-import org.reactivestreams.Subscription
import kotlin.coroutines.experimental.CoroutineContext
/**
@@ -41,6 +39,19 @@
}
/**
+ * Converts this deferred value to the hot reactive maybe that signals
+ * [onComplete][MaybeEmitter.onComplete], [onSuccess][MaybeEmitter.onSuccess] or [onError][MaybeEmitter.onError].
+ *
+ * Every subscriber gets the same completion value.
+ * Unsubscribing from the resulting maybe **does not** affect the original deferred value in any way.
+ *
+ * @param context -- the coroutine context from which the resulting maybe is going to be signalled
+ */
+public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMaybe<T>(context) {
+ this@asMaybe.await()
+}
+
+/**
* Converts this deferred value to the hot reactive single that signals either
* [onSuccess][SingleSubscriber.onSuccess] or [onError][SingleSubscriber.onError].
*
diff --git a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxMaybe.kt b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxMaybe.kt
new file mode 100644
index 0000000..787a5eb
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxMaybe.kt
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.rx2
+
+import io.reactivex.Maybe
+import io.reactivex.MaybeEmitter
+import io.reactivex.functions.Cancellable
+import kotlinx.coroutines.experimental.AbstractCoroutine
+import kotlinx.coroutines.experimental.CoroutineScope
+import kotlinx.coroutines.experimental.Job
+import kotlinx.coroutines.experimental.newCoroutineContext
+import kotlin.coroutines.experimental.CoroutineContext
+import kotlin.coroutines.experimental.startCoroutine
+
+/**
+ * Creates cold [maybe][Maybe] that will run a given [block] in a coroutine.
+ * Every time the returned observable is subscribed, it starts a new coroutine in the specified [context].
+ * Coroutine returns a single, possibly null value. Unsubscribing cancels running coroutine.
+ *
+ * | **Coroutine action** | **Signal to subscriber**
+ * | ------------------------------------- | ------------------------
+ * | Returns a non-null value | `onSuccess`
+ * | Returns a null | `onComplete`
+ * | Failure with exception or unsubscribe | `onError`
+ */
+public fun <T> rxMaybe(
+ context: CoroutineContext,
+ block: suspend CoroutineScope.() -> T?
+): Maybe<T> = Maybe.create { subscriber ->
+ val newContext = newCoroutineContext(context)
+ val coroutine = RxMaybeCoroutine(newContext, subscriber)
+ coroutine.initParentJob(context[Job])
+ subscriber.setCancellable(coroutine)
+ block.startCoroutine(coroutine, coroutine)
+}
+
+private class RxMaybeCoroutine<T>(
+ override val parentContext: CoroutineContext,
+ private val subscriber: MaybeEmitter<T>
+) : AbstractCoroutine<T>(true), Cancellable {
+ @Suppress("UNCHECKED_CAST")
+ override fun afterCompletion(state: Any?, mode: Int) {
+ if (subscriber.isDisposed) return
+ when {
+ state is CompletedExceptionally -> subscriber.onError(state.exception)
+ state != null -> subscriber.onSuccess(state as T)
+ else -> subscriber.onComplete()
+ }
+ }
+
+ // Cancellable impl
+ override fun cancel() { cancel(cause = null) }
+}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/Check.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/Check.kt
index 05f1a30..b6deb48 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/Check.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/Check.kt
@@ -16,6 +16,7 @@
package kotlinx.coroutines.experimental.rx2
+import io.reactivex.Maybe
import io.reactivex.Observable
import io.reactivex.Single
@@ -55,3 +56,23 @@
}
}
+fun <T> checkMaybeValue(
+ maybe: Maybe<T>,
+ checker: (T?) -> Unit
+) {
+ val maybeValue = maybe.toFlowable().blockingIterable().firstOrNull()
+ checker(maybeValue)
+}
+
+fun checkErroneous(
+ maybe: Maybe<*>,
+ checker: (Throwable) -> Unit
+) {
+ try {
+ (maybe as Maybe<Any>).blockingGet()
+ error("Should have failed")
+ } catch (e: Throwable) {
+ checker(e)
+ }
+}
+
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ConvertTest.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ConvertTest.kt
index c22ef77..7649391 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ConvertTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ConvertTest.kt
@@ -19,6 +19,7 @@
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.produce
import org.junit.Assert.assertEquals
+import org.junit.Assert.assertNull
import org.junit.Test
class ConvertTest : TestBase() {
@@ -56,6 +57,50 @@
}
@Test
+ fun testToMaybe() {
+ val d = async(CommonPool) {
+ delay(50)
+ "OK"
+ }
+ val maybe1 = d.asMaybe(Unconfined)
+ checkMaybeValue(maybe1) {
+ assertEquals("OK", it)
+ }
+ val maybe2 = d.asMaybe(Unconfined)
+ checkMaybeValue(maybe2) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testToMaybeEmpty() {
+ val d = async(CommonPool) {
+ delay(50)
+ null
+ }
+ val maybe1 = d.asMaybe(Unconfined)
+ checkMaybeValue(maybe1, ::assertNull)
+ val maybe2 = d.asMaybe(Unconfined)
+ checkMaybeValue(maybe2, ::assertNull)
+ }
+
+ @Test
+ fun testToMaybeFail() {
+ val d = async(CommonPool) {
+ delay(50)
+ throw TestException("OK")
+ }
+ val maybe1 = d.asMaybe(Unconfined)
+ checkErroneous(maybe1) {
+ check(it is TestException && it.message == "OK") { "$it" }
+ }
+ val maybe2 = d.asMaybe(Unconfined)
+ checkErroneous(maybe2) {
+ check(it is TestException && it.message == "OK") { "$it" }
+ }
+ }
+
+ @Test
fun testToSingle() {
val d = async(CommonPool) {
delay(50)
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/MaybeTest.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/MaybeTest.kt
new file mode 100644
index 0000000..628441f
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/MaybeTest.kt
@@ -0,0 +1,216 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.rx2
+
+import io.reactivex.Maybe
+import kotlinx.coroutines.experimental.CommonPool
+import kotlinx.coroutines.experimental.runBlocking
+import org.junit.Assert.assertEquals
+import org.junit.Test
+import io.reactivex.Observable
+import io.reactivex.functions.Action
+import io.reactivex.internal.functions.Functions.ON_ERROR_MISSING
+import io.reactivex.internal.functions.Functions.emptyConsumer
+import kotlinx.coroutines.experimental.TestBase
+import kotlinx.coroutines.experimental.yield
+import org.hamcrest.core.IsEqual
+import org.hamcrest.core.IsInstanceOf
+import org.junit.Assert
+import org.junit.Assert.assertNull
+import java.util.concurrent.TimeUnit
+
+class MaybeTest : TestBase() {
+ @Test
+ fun testBasicSuccess() = runBlocking<Unit> {
+ expect(1)
+ val maybe = rxMaybe(context) {
+ expect(4)
+ "OK"
+ }
+ expect(2)
+ maybe.subscribe { value ->
+ expect(5)
+ Assert.assertThat(value, IsEqual("OK"))
+ }
+ expect(3)
+ yield() // to started coroutine
+ finish(6)
+ }
+
+ @Test
+ fun testBasicEmpty() = runBlocking<Unit> {
+ expect(1)
+ val maybe = rxMaybe(context) {
+ expect(4)
+ null
+ }
+ expect(2)
+ maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, Action {
+ expect(5)
+ })
+ expect(3)
+ yield() // to started coroutine
+ finish(6)
+ }
+
+ @Test
+ fun testBasicFailure() = runBlocking<Unit> {
+ expect(1)
+ val maybe = rxMaybe(context) {
+ expect(4)
+ throw RuntimeException("OK")
+ }
+ expect(2)
+ maybe.subscribe({
+ expectUnreached()
+ }, { error ->
+ expect(5)
+ Assert.assertThat(error, IsInstanceOf(RuntimeException::class.java))
+ Assert.assertThat(error.message, IsEqual("OK"))
+ })
+ expect(3)
+ yield() // to started coroutine
+ finish(6)
+ }
+
+
+ @Test
+ fun testBasicUnsubscribe() = runBlocking<Unit> {
+ expect(1)
+ val maybe = rxMaybe(context) {
+ expect(4)
+ yield() // back to main, will get cancelled
+ expectUnreached()
+ }
+ expect(2)
+ // nothing is called on a disposed rx2 maybe
+ val sub = maybe.subscribe({
+ expectUnreached()
+ }, {
+ expectUnreached()
+ })
+ expect(3)
+ yield() // to started coroutine
+ expect(5)
+ sub.dispose() // will cancel coroutine
+ yield()
+ finish(6)
+ }
+
+ @Test
+ fun testMaybeNoWait() {
+ val maybe = rxMaybe(CommonPool) {
+ "OK"
+ }
+
+ checkMaybeValue(maybe) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testMaybeAwait() = runBlocking {
+ assertEquals("OK", Maybe.just("O").await() + "K")
+ }
+
+ @Test
+ fun testMaybeAwaitForNull() = runBlocking {
+ assertNull(Maybe.empty<String>().await())
+ }
+
+ @Test
+ fun testMaybeEmitAndAwait() {
+ val maybe = rxMaybe(CommonPool) {
+ Maybe.just("O").await() + "K"
+ }
+
+ checkMaybeValue(maybe) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testMaybeWithDelay() {
+ val maybe = rxMaybe(CommonPool) {
+ Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K"
+ }
+
+ checkMaybeValue(maybe) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testMaybeException() {
+ val maybe = rxMaybe(CommonPool) {
+ Observable.just("O", "K").awaitSingle() + "K"
+ }
+
+ checkErroneous(maybe) {
+ assert(it is IllegalArgumentException)
+ }
+ }
+
+ @Test
+ fun testAwaitFirst() {
+ val maybe = rxMaybe(CommonPool) {
+ Observable.just("O", "#").awaitFirst() + "K"
+ }
+
+ checkMaybeValue(maybe) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testAwaitLast() {
+ val maybe = rxMaybe(CommonPool) {
+ Observable.just("#", "O").awaitLast() + "K"
+ }
+
+ checkMaybeValue(maybe) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testExceptionFromObservable() {
+ val maybe = rxMaybe(CommonPool) {
+ try {
+ Observable.error<String>(RuntimeException("O")).awaitFirst()
+ } catch (e: RuntimeException) {
+ Observable.just(e.message!!).awaitLast() + "K"
+ }
+ }
+
+ checkMaybeValue(maybe) {
+ assertEquals("OK", it)
+ }
+ }
+
+ @Test
+ fun testExceptionFromCoroutine() {
+ val maybe = rxMaybe<String>(CommonPool) {
+ throw IllegalStateException(Observable.just("O").awaitSingle() + "K")
+ }
+
+ checkErroneous(maybe) {
+ assert(it is IllegalStateException)
+ assertEquals("OK", it.message)
+ }
+ }
+}