Add support for Rx2 Maybe. (#45)

Add support for Rx2 Maybe
diff --git a/reactive/kotlinx-coroutines-rx2/README.md b/reactive/kotlinx-coroutines-rx2/README.md
index e0a5ed6..11b3381 100644
--- a/reactive/kotlinx-coroutines-rx2/README.md
+++ b/reactive/kotlinx-coroutines-rx2/README.md
@@ -7,6 +7,7 @@
 | **Name**        | **Result**                              | **Scope**        | **Description**
 | --------------- | --------------------------------------- | ---------------- | ---------------
 | [rxCompletable] | `Completable`                           | [CoroutineScope] | Cold completable that starts coroutine on subscribe
+| [rxMaybe]       | `Maybe`                                 | [CoroutineScope] | Cold maybe that starts coroutine on subscribe
 | [rxSingle]      | `Single`                                | [CoroutineScope] | Cold single that starts coroutine on subscribe
 | [rxObservable]  | `Observable`                            | [ProducerScope]  | Cold observable that starts coroutine on subscribe
 | [rxFlowable]    | `Flowable`                              | [ProducerScope]  | Cold observable that starts coroutine on subscribe with **backpressure** support 
@@ -16,6 +17,9 @@
 | **Name** | **Description**
 | -------- | ---------------
 | [CompletableSource.await][io.reactivex.CompletableSource.await] | Awaits for completion of the completable value 
+| [MaybeSource.await][io.reactivex.MaybeSource.await] | Awaits for the value of the maybe and returns it or null 
+| [MaybeSource.awaitOrDefault][io.reactivex.MaybeSource.awaitOrDefault] | Awaits for the value of the maybe and returns it or default 
+| [MaybeSource.open][io.reactivex.MaybeSource.open] | Subscribes to maybe and returns [ReceiveChannel] 
 | [SingleSource.await][io.reactivex.SingleSource.await] | Awaits for completion of the single value and returns it 
 | [ObservableSource.awaitFirst][io.reactivex.ObservableSource.awaitFirst] | Awaits for the first value from the given observable
 | [ObservableSource.awaitFirstOrDefault][io.reactivex.ObservableSource.awaitFirstOrDefault] | Awaits for the first value from the given observable or default
@@ -50,10 +54,14 @@
 <!--- DOCS_ROOT reactive/kotlinx-coroutines-rx2/target/dokka/kotlinx-coroutines-rx2 -->
 <!--- INDEX kotlinx.coroutines.experimental.rx2 -->
 [rxCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/rx-completable.html
+[rxMaybe]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/rx-maybe.html
 [rxSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/rx-single.html
 [rxObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/rx-observable.html
 [rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/rx-flowable.html
 [io.reactivex.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-completable-source/await.html
+[io.reactivex.MaybeSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-maybe-source/await.html
+[io.reactivex.MaybeSource.awaitOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-maybe-source/await-or-default.html
+[io.reactivex.MaybeSource.open]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-maybe-source/open.html
 [io.reactivex.SingleSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-single-source/await.html
 [io.reactivex.ObservableSource.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-observable-source/await-first.html
 [io.reactivex.ObservableSource.awaitFirstOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-observable-source/await-first-or-default.html
diff --git a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxAwait.kt b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxAwait.kt
index 9f4a30f..50594a3 100644
--- a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxAwait.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxAwait.kt
@@ -40,6 +40,38 @@
     })
 }
 
+// ------------------------ MaybeSource ------------------------
+
+/**
+ * Awaits for completion of the maybe without blocking a thread.
+ * Returns the resulting value, null if no value was produced or throws the corresponding exception if this
+ * maybe had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ */
+@Suppress("UNCHECKED_CAST")
+public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).awaitOrDefault(null)
+
+/**
+ * Awaits for completion of the maybe without blocking a thread.
+ * Returns the resulting value, [default] if no value was produced or throws the corresponding exception if this
+ * maybe had produced error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ */
+public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = suspendCancellableCoroutine { cont ->
+    subscribe(object : MaybeObserver<T> {
+        override fun onSubscribe(d: Disposable) { cont.disposeOnCompletion(d) }
+        override fun onComplete() { cont.resume(default) }
+        override fun onSuccess(t: T) { cont.resume(t) }
+        override fun onError(error: Throwable) { cont.resumeWithException(error) }
+    })
+}
+
 // ------------------------ SingleSource ------------------------
 
 /**
diff --git a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxChannel.kt b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxChannel.kt
index e58116c..0751453 100644
--- a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxChannel.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxChannel.kt
@@ -16,6 +16,8 @@
 
 package kotlinx.coroutines.experimental.rx2
 
+import io.reactivex.MaybeObserver
+import io.reactivex.MaybeSource
 import io.reactivex.Observable
 import io.reactivex.ObservableSource
 import io.reactivex.Observer
@@ -25,7 +27,7 @@
 import java.io.Closeable
 
 /**
- * Return type for [ObservableSource.open] that can be used to [receive] elements from the
+ * Return type for [ObservableSource.open] and [MaybeSource.open] that can be used to [receive] elements from the
  * subscription and to manually [close] it.
  */
 public interface SubscriptionReceiveChannel<out T> : ReceiveChannel<T>, Closeable {
@@ -36,6 +38,15 @@
 }
 
 /**
+ * Subscribes to this [MaybeSource] and returns a channel to receive elements emitted by it.
+ */
+public fun <T> MaybeSource<T>.open(): SubscriptionReceiveChannel<T> {
+    val channel = SubscriptionChannel<T>()
+    subscribe(channel)
+    return channel
+}
+
+/**
  * Subscribes to this [ObservableSource] and returns a channel to receive elements emitted by it.
  */
 public fun <T> ObservableSource<T>.open(): SubscriptionReceiveChannel<T> {
@@ -58,6 +69,16 @@
 public operator fun <T> ObservableSource<T>.iterator() = open().iterator()
 
 /**
+ * Subscribes to this [MaybeSource] and performs the specified action for each received element.
+ */
+// :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
+public suspend fun <T> MaybeSource<T>.consumeEach(action: suspend (T) -> Unit) {
+    open().use { channel ->
+        for (x in channel) action(x)
+    }
+}
+
+/**
  * Subscribes to this [ObservableSource] and performs the specified action for each received element.
  */
 // :todo: make it inline when this bug is fixed: https://youtrack.jetbrains.com/issue/KT-16448
@@ -67,7 +88,7 @@
     }
 }
 
-private class SubscriptionChannel<T> : LinkedListChannel<T>(), SubscriptionReceiveChannel<T>, Observer<T> {
+private class SubscriptionChannel<T> : LinkedListChannel<T>(), SubscriptionReceiveChannel<T>, Observer<T>, MaybeObserver<T> {
     @Volatile
     var subscription: Disposable? = null
 
@@ -86,6 +107,10 @@
         subscription = sub
     }
 
+    override fun onSuccess(t: T) {
+        offer(t)
+    }
+
     override fun onNext(t: T) {
         offer(t)
     }
diff --git a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxConvert.kt
index ffc9404..648cdad 100644
--- a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxConvert.kt
@@ -17,14 +17,12 @@
 package kotlinx.coroutines.experimental.rx2
 
 import io.reactivex.Completable
+import io.reactivex.Maybe
 import kotlinx.coroutines.experimental.Deferred
 import kotlinx.coroutines.experimental.Job
 import kotlinx.coroutines.experimental.channels.ReceiveChannel
-import kotlinx.coroutines.experimental.launch
 import io.reactivex.Observable
 import io.reactivex.Single
-import io.reactivex.functions.Cancellable
-import org.reactivestreams.Subscription
 import kotlin.coroutines.experimental.CoroutineContext
 
 /**
@@ -41,6 +39,19 @@
 }
 
 /**
+ * Converts this deferred value to the hot reactive maybe that signals
+ * [onComplete][MaybeEmitter.onComplete], [onSuccess][MaybeEmitter.onSuccess] or [onError][MaybeEmitter.onError].
+ *
+ * Every subscriber gets the same completion value.
+ * Unsubscribing from the resulting maybe **does not** affect the original deferred value in any way.
+ *
+ * @param context -- the coroutine context from which the resulting maybe is going to be signalled
+ */
+public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMaybe<T>(context) {
+    this@asMaybe.await()
+}
+
+/**
  * Converts this deferred value to the hot reactive single that signals either
  * [onSuccess][SingleSubscriber.onSuccess] or [onError][SingleSubscriber.onError].
  *
diff --git a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxMaybe.kt b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxMaybe.kt
new file mode 100644
index 0000000..787a5eb
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxMaybe.kt
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.rx2
+
+import io.reactivex.Maybe
+import io.reactivex.MaybeEmitter
+import io.reactivex.functions.Cancellable
+import kotlinx.coroutines.experimental.AbstractCoroutine
+import kotlinx.coroutines.experimental.CoroutineScope
+import kotlinx.coroutines.experimental.Job
+import kotlinx.coroutines.experimental.newCoroutineContext
+import kotlin.coroutines.experimental.CoroutineContext
+import kotlin.coroutines.experimental.startCoroutine
+
+/**
+ * Creates cold [maybe][Maybe] that will run a given [block] in a coroutine.
+ * Every time the returned observable is subscribed, it starts a new coroutine in the specified [context].
+ * Coroutine returns a single, possibly null value. Unsubscribing cancels running coroutine.
+ *
+ * | **Coroutine action**                  | **Signal to subscriber**
+ * | ------------------------------------- | ------------------------
+ * | Returns a non-null value              | `onSuccess`
+ * | Returns a null                        | `onComplete`
+ * | Failure with exception or unsubscribe | `onError`
+ */
+public fun <T> rxMaybe(
+    context: CoroutineContext,
+    block: suspend CoroutineScope.() -> T?
+): Maybe<T> = Maybe.create { subscriber ->
+    val newContext = newCoroutineContext(context)
+    val coroutine = RxMaybeCoroutine(newContext, subscriber)
+    coroutine.initParentJob(context[Job])
+    subscriber.setCancellable(coroutine)
+    block.startCoroutine(coroutine, coroutine)
+}
+
+private class RxMaybeCoroutine<T>(
+    override val parentContext: CoroutineContext,
+    private val subscriber: MaybeEmitter<T>
+) : AbstractCoroutine<T>(true), Cancellable {
+    @Suppress("UNCHECKED_CAST")
+    override fun afterCompletion(state: Any?, mode: Int) {
+        if (subscriber.isDisposed) return
+        when {
+            state is CompletedExceptionally -> subscriber.onError(state.exception)
+            state != null                   -> subscriber.onSuccess(state as T)
+            else                            -> subscriber.onComplete()
+        }
+    }
+
+    // Cancellable impl
+    override fun cancel() { cancel(cause = null) }
+}
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/Check.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/Check.kt
index 05f1a30..b6deb48 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/Check.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/Check.kt
@@ -16,6 +16,7 @@
 
 package kotlinx.coroutines.experimental.rx2
 
+import io.reactivex.Maybe
 import io.reactivex.Observable
 import io.reactivex.Single
 
@@ -55,3 +56,23 @@
     }
 }
 
+fun <T> checkMaybeValue(
+        maybe: Maybe<T>,
+        checker: (T?) -> Unit
+) {
+    val maybeValue = maybe.toFlowable().blockingIterable().firstOrNull()
+    checker(maybeValue)
+}
+
+fun checkErroneous(
+    maybe: Maybe<*>,
+    checker: (Throwable) -> Unit
+) {
+    try {
+        (maybe as Maybe<Any>).blockingGet()
+        error("Should have failed")
+    } catch (e: Throwable) {
+        checker(e)
+    }
+}
+
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ConvertTest.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ConvertTest.kt
index c22ef77..7649391 100644
--- a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ConvertTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/ConvertTest.kt
@@ -19,6 +19,7 @@
 import kotlinx.coroutines.experimental.*
 import kotlinx.coroutines.experimental.channels.produce
 import org.junit.Assert.assertEquals
+import org.junit.Assert.assertNull
 import org.junit.Test
 
 class ConvertTest : TestBase() {
@@ -56,6 +57,50 @@
     }
 
     @Test
+    fun testToMaybe() {
+        val d = async(CommonPool) {
+            delay(50)
+            "OK"
+        }
+        val maybe1 = d.asMaybe(Unconfined)
+        checkMaybeValue(maybe1) {
+            assertEquals("OK", it)
+        }
+        val maybe2 = d.asMaybe(Unconfined)
+        checkMaybeValue(maybe2) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testToMaybeEmpty() {
+        val d = async(CommonPool) {
+            delay(50)
+            null
+        }
+        val maybe1 = d.asMaybe(Unconfined)
+        checkMaybeValue(maybe1, ::assertNull)
+        val maybe2 = d.asMaybe(Unconfined)
+        checkMaybeValue(maybe2, ::assertNull)
+    }
+
+    @Test
+    fun testToMaybeFail() {
+        val d = async(CommonPool) {
+            delay(50)
+            throw TestException("OK")
+        }
+        val maybe1 = d.asMaybe(Unconfined)
+        checkErroneous(maybe1) {
+            check(it is TestException && it.message == "OK") { "$it" }
+        }
+        val maybe2 = d.asMaybe(Unconfined)
+        checkErroneous(maybe2) {
+            check(it is TestException && it.message == "OK") { "$it" }
+        }
+    }
+
+    @Test
     fun testToSingle() {
         val d = async(CommonPool) {
             delay(50)
diff --git a/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/MaybeTest.kt b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/MaybeTest.kt
new file mode 100644
index 0000000..628441f
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/src/test/kotlin/kotlinx/coroutines/experimental/rx2/MaybeTest.kt
@@ -0,0 +1,216 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.rx2
+
+import io.reactivex.Maybe
+import kotlinx.coroutines.experimental.CommonPool
+import kotlinx.coroutines.experimental.runBlocking
+import org.junit.Assert.assertEquals
+import org.junit.Test
+import io.reactivex.Observable
+import io.reactivex.functions.Action
+import io.reactivex.internal.functions.Functions.ON_ERROR_MISSING
+import io.reactivex.internal.functions.Functions.emptyConsumer
+import kotlinx.coroutines.experimental.TestBase
+import kotlinx.coroutines.experimental.yield
+import org.hamcrest.core.IsEqual
+import org.hamcrest.core.IsInstanceOf
+import org.junit.Assert
+import org.junit.Assert.assertNull
+import java.util.concurrent.TimeUnit
+
+class MaybeTest : TestBase() {
+    @Test
+    fun testBasicSuccess() = runBlocking<Unit> {
+        expect(1)
+        val maybe = rxMaybe(context) {
+            expect(4)
+            "OK"
+        }
+        expect(2)
+        maybe.subscribe { value ->
+            expect(5)
+            Assert.assertThat(value, IsEqual("OK"))
+        }
+        expect(3)
+        yield() // to started coroutine
+        finish(6)
+    }
+
+    @Test
+    fun testBasicEmpty() = runBlocking<Unit> {
+        expect(1)
+        val maybe = rxMaybe(context) {
+            expect(4)
+            null
+        }
+        expect(2)
+        maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, Action {
+            expect(5)
+        })
+        expect(3)
+        yield() // to started coroutine
+        finish(6)
+    }
+
+    @Test
+    fun testBasicFailure() = runBlocking<Unit> {
+        expect(1)
+        val maybe = rxMaybe(context) {
+            expect(4)
+            throw RuntimeException("OK")
+        }
+        expect(2)
+        maybe.subscribe({
+            expectUnreached()
+        }, { error ->
+            expect(5)
+            Assert.assertThat(error, IsInstanceOf(RuntimeException::class.java))
+            Assert.assertThat(error.message, IsEqual("OK"))
+        })
+        expect(3)
+        yield() // to started coroutine
+        finish(6)
+    }
+
+
+    @Test
+    fun testBasicUnsubscribe() = runBlocking<Unit> {
+        expect(1)
+        val maybe = rxMaybe(context) {
+            expect(4)
+            yield() // back to main, will get cancelled
+            expectUnreached()
+        }
+        expect(2)
+        // nothing is called on a disposed rx2 maybe
+        val sub = maybe.subscribe({
+            expectUnreached()
+        }, {
+            expectUnreached()
+        })
+        expect(3)
+        yield() // to started coroutine
+        expect(5)
+        sub.dispose() // will cancel coroutine
+        yield()
+        finish(6)
+    }
+
+    @Test
+    fun testMaybeNoWait() {
+        val maybe = rxMaybe(CommonPool) {
+            "OK"
+        }
+
+        checkMaybeValue(maybe) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testMaybeAwait() = runBlocking {
+        assertEquals("OK", Maybe.just("O").await() + "K")
+    }
+
+    @Test
+    fun testMaybeAwaitForNull() = runBlocking {
+        assertNull(Maybe.empty<String>().await())
+    }
+
+    @Test
+    fun testMaybeEmitAndAwait() {
+        val maybe = rxMaybe(CommonPool) {
+            Maybe.just("O").await() + "K"
+        }
+
+        checkMaybeValue(maybe) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testMaybeWithDelay() {
+        val maybe = rxMaybe(CommonPool) {
+            Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K"
+        }
+
+        checkMaybeValue(maybe) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testMaybeException() {
+        val maybe = rxMaybe(CommonPool) {
+            Observable.just("O", "K").awaitSingle() + "K"
+        }
+
+        checkErroneous(maybe) {
+            assert(it is IllegalArgumentException)
+        }
+    }
+
+    @Test
+    fun testAwaitFirst() {
+        val maybe = rxMaybe(CommonPool) {
+            Observable.just("O", "#").awaitFirst() + "K"
+        }
+
+        checkMaybeValue(maybe) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testAwaitLast() {
+        val maybe = rxMaybe(CommonPool) {
+            Observable.just("#", "O").awaitLast() + "K"
+        }
+
+        checkMaybeValue(maybe) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testExceptionFromObservable() {
+        val maybe = rxMaybe(CommonPool) {
+            try {
+                Observable.error<String>(RuntimeException("O")).awaitFirst()
+            } catch (e: RuntimeException) {
+                Observable.just(e.message!!).awaitLast() + "K"
+            }
+        }
+
+        checkMaybeValue(maybe) {
+            assertEquals("OK", it)
+        }
+    }
+
+    @Test
+    fun testExceptionFromCoroutine() {
+        val maybe = rxMaybe<String>(CommonPool) {
+            throw IllegalStateException(Observable.just("O").awaitSingle() + "K")
+        }
+
+        checkErroneous(maybe) {
+            assert(it is IllegalStateException)
+            assertEquals("OK", it.message)
+        }
+    }
+}