blob: 9838fdae45cdcaf6637b2a3b381c52c289af89d6 [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kotlinx.coroutines.experimental.rx1
18
19import kotlinx.coroutines.experimental.CancellableContinuation
20import kotlinx.coroutines.experimental.CancellationException
21import kotlinx.coroutines.experimental.Job
22import kotlinx.coroutines.experimental.suspendCancellableCoroutine
23import rx.*
24
Konrad Kamiński804d0362017-04-07 09:22:58 +020025// ------------------------ Completable ------------------------
26
27/**
28 * Awaits for completion of this completable without blocking a thread.
29 * Returns `Unit` or throws the corresponding exception if this completable had produced error.
30 *
Roman Elizarovd82b3a92017-06-23 21:52:08 +030031 * This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this
Konrad Kamiński804d0362017-04-07 09:22:58 +020032 * suspending function is suspended, this function immediately resumes with [CancellationException].
33 */
34public suspend fun Completable.awaitCompleted(): Unit = suspendCancellableCoroutine { cont ->
35 subscribe(object : CompletableSubscriber {
Roman Elizarov3e9f2442018-04-28 17:38:22 +030036 override fun onSubscribe(s: Subscription) { cont.unsubscribeOnCancellation(s) }
Konrad Kamiński804d0362017-04-07 09:22:58 +020037 override fun onCompleted() { cont.resume(Unit) }
38 override fun onError(e: Throwable) { cont.resumeWithException(e) }
39 })
40}
41
Roman Elizarov331750b2017-02-15 17:59:17 +030042// ------------------------ Single ------------------------
43
44/**
45 * Awaits for completion of the single value without blocking a thread and
46 * returns the resulting value or throws the corresponding exception if this single had produced error.
47 *
48 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +030049 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizarov331750b2017-02-15 17:59:17 +030050 * immediately resumes with [CancellationException].
51 */
52public suspend fun <T> Single<T>.await(): T = suspendCancellableCoroutine { cont ->
Roman Elizarov3e9f2442018-04-28 17:38:22 +030053 cont.unsubscribeOnCancellation(subscribe(object : SingleSubscriber<T>() {
Roman Elizarov331750b2017-02-15 17:59:17 +030054 override fun onSuccess(t: T) { cont.resume(t) }
55 override fun onError(error: Throwable) { cont.resumeWithException(error) }
56 }))
57}
58
59// ------------------------ Observable ------------------------
60
61/**
62 * Awaits for the first value from the given observable without blocking a thread and
63 * returns the resulting value or throws the corresponding exception if this observable had produced error.
64 *
65 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +030066 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizarov331750b2017-02-15 17:59:17 +030067 * immediately resumes with [CancellationException].
Roman Elizarov4a67afb2017-03-16 11:10:47 +030068 *
69 * @throws NoSuchElementException if observable does not emit any value
Roman Elizarov331750b2017-02-15 17:59:17 +030070 */
71public suspend fun <T> Observable<T>.awaitFirst(): T = first().awaitOne()
72
73/**
Kirill Rakhmanff4c7c32017-03-15 13:19:56 +010074 * Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
75 * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
76 *
77 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +030078 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Kirill Rakhmanff4c7c32017-03-15 13:19:56 +010079 * immediately resumes with [CancellationException].
80 */
81public suspend fun <T> Observable<T>.awaitFirstOrDefault(default: T): T = firstOrDefault(default).awaitOne()
82
83/**
Konrad Kamińskib8ed47c2018-02-06 12:17:53 +010084 * Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
85 * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
86 *
87 * This suspending function is cancellable.
88 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
89 * immediately resumes with [CancellationException].
90 */
91public suspend fun <T> Observable<T>.awaitFirstOrNull(): T? = firstOrDefault(null).awaitOne()
92
93/**
94 * Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
95 * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
96 *
97 * This suspending function is cancellable.
98 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
99 * immediately resumes with [CancellationException].
100 */
101public suspend fun <T> Observable<T>.awaitFirstOrElse(defaultValue: () -> T): T = switchIfEmpty(Observable.fromCallable(defaultValue)).first().awaitOne()
102
103/**
Roman Elizarov331750b2017-02-15 17:59:17 +0300104 * Awaits for the last value from the given observable without blocking a thread and
105 * returns the resulting value or throws the corresponding exception if this observable had produced error.
106 *
107 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +0300108 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizarov331750b2017-02-15 17:59:17 +0300109 * immediately resumes with [CancellationException].
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300110 *
111 * @throws NoSuchElementException if observable does not emit any value
Roman Elizarov331750b2017-02-15 17:59:17 +0300112 */
113public suspend fun <T> Observable<T>.awaitLast(): T = last().awaitOne()
114
115/**
116 * Awaits for the single value from the given observable without blocking a thread and
117 * returns the resulting value or throws the corresponding exception if this observable had produced error.
118 *
119 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +0300120 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizarov331750b2017-02-15 17:59:17 +0300121 * immediately resumes with [CancellationException].
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300122 *
123 * @throws NoSuchElementException if observable does not emit any value
124 * @throws IllegalArgumentException if publisher emits more than one value
Roman Elizarov331750b2017-02-15 17:59:17 +0300125 */
126public suspend fun <T> Observable<T>.awaitSingle(): T = single().awaitOne()
127
128// ------------------------ private ------------------------
129
130private suspend fun <T> Observable<T>.awaitOne(): T = suspendCancellableCoroutine { cont ->
Roman Elizarov3e9f2442018-04-28 17:38:22 +0300131 cont.unsubscribeOnCancellation(subscribe(object : Subscriber<T>() {
Roman Elizarov331750b2017-02-15 17:59:17 +0300132 override fun onStart() { request(1) }
133 override fun onNext(t: T) { cont.resume(t) }
134 override fun onCompleted() { if (cont.isActive) cont.resumeWithException(IllegalStateException("Should have invoked onNext")) }
135 override fun onError(e: Throwable) { cont.resumeWithException(e) }
136 }))
137}
138
Roman Elizarov3e9f2442018-04-28 17:38:22 +0300139internal fun <T> CancellableContinuation<T>.unsubscribeOnCancellation(sub: Subscription) =
Vsevolod Tolstopyatov80a29472018-04-17 16:02:02 +0300140 invokeOnCancellation { sub.unsubscribe() }