blob: a12449483f6e843f199b751ece10092168264ca1 [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov331750b2017-02-15 17:59:17 +03003 */
4
5package kotlinx.coroutines.experimental.rx2
6
7import io.reactivex.*
8import io.reactivex.disposables.Disposable
9import kotlinx.coroutines.experimental.CancellableContinuation
10import kotlinx.coroutines.experimental.CancellationException
11import kotlinx.coroutines.experimental.Job
12import kotlinx.coroutines.experimental.suspendCancellableCoroutine
13
14// ------------------------ CompletableSource ------------------------
15
16/**
17 * Awaits for completion of this completable without blocking a thread.
18 * Returns `Unit` or throws the corresponding exception if this completable had produced error.
19 *
Roman Elizarovd82b3a92017-06-23 21:52:08 +030020 * This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this
Roman Elizarov331750b2017-02-15 17:59:17 +030021 * suspending function is suspended, this function immediately resumes with [CancellationException].
22 */
23public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
24 subscribe(object : CompletableObserver {
Roman Elizarov3e9f2442018-04-28 17:38:22 +030025 override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
Roman Elizarov331750b2017-02-15 17:59:17 +030026 override fun onComplete() { cont.resume(Unit) }
Roman Elizarovf84f99f2017-03-06 12:31:45 +030027 override fun onError(e: Throwable) { cont.resumeWithException(e) }
Roman Elizarov331750b2017-02-15 17:59:17 +030028 })
29}
30
Konrad Kamińskid6bb1482017-04-07 09:26:40 +020031// ------------------------ MaybeSource ------------------------
32
33/**
34 * Awaits for completion of the maybe without blocking a thread.
35 * Returns the resulting value, null if no value was produced or throws the corresponding exception if this
36 * maybe had produced error.
37 *
38 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +030039 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Konrad Kamińskid6bb1482017-04-07 09:26:40 +020040 * immediately resumes with [CancellationException].
41 */
42@Suppress("UNCHECKED_CAST")
43public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).awaitOrDefault(null)
44
45/**
46 * Awaits for completion of the maybe without blocking a thread.
47 * Returns the resulting value, [default] if no value was produced or throws the corresponding exception if this
48 * maybe had produced error.
49 *
50 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +030051 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Konrad Kamińskid6bb1482017-04-07 09:26:40 +020052 * immediately resumes with [CancellationException].
53 */
54public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = suspendCancellableCoroutine { cont ->
55 subscribe(object : MaybeObserver<T> {
Roman Elizarov3e9f2442018-04-28 17:38:22 +030056 override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
Konrad Kamińskid6bb1482017-04-07 09:26:40 +020057 override fun onComplete() { cont.resume(default) }
58 override fun onSuccess(t: T) { cont.resume(t) }
59 override fun onError(error: Throwable) { cont.resumeWithException(error) }
60 })
61}
62
Roman Elizarov331750b2017-02-15 17:59:17 +030063// ------------------------ SingleSource ------------------------
64
65/**
66 * Awaits for completion of the single value without blocking a thread.
67 * Returns the resulting value or throws the corresponding exception if this single had produced error.
68 *
69 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +030070 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizarov331750b2017-02-15 17:59:17 +030071 * immediately resumes with [CancellationException].
72 */
73public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
74 subscribe(object : SingleObserver<T> {
Roman Elizarov3e9f2442018-04-28 17:38:22 +030075 override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
Roman Elizarov331750b2017-02-15 17:59:17 +030076 override fun onSuccess(t: T) { cont.resume(t) }
77 override fun onError(error: Throwable) { cont.resumeWithException(error) }
78 })
79}
80
81// ------------------------ ObservableSource ------------------------
82
83/**
84 * Awaits for the first value from the given observable without blocking a thread.
85 * Returns the resulting value or throws the corresponding exception if this observable had produced error.
86 *
87 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +030088 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizarov331750b2017-02-15 17:59:17 +030089 * immediately resumes with [CancellationException].
Roman Elizarov4a67afb2017-03-16 11:10:47 +030090 *
91 * @throws NoSuchElementException if observable does not emit any value
Roman Elizarov331750b2017-02-15 17:59:17 +030092 */
93public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
94
95/**
Roman Elizarov4a67afb2017-03-16 11:10:47 +030096 * Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
97 * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
98 *
99 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +0300100 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300101 * immediately resumes with [CancellationException].
102 */
103public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
104
105/**
Konrad Kamińskib8ed47c2018-02-06 12:17:53 +0100106 * Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
107 * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
108 *
109 * This suspending function is cancellable.
110 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
111 * immediately resumes with [CancellationException].
112 */
113public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
114
115/**
116 * Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
117 * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
118 *
119 * This suspending function is cancellable.
120 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
121 * immediately resumes with [CancellationException].
122 */
123public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
124
125/**
Roman Elizarov331750b2017-02-15 17:59:17 +0300126 * Awaits for the last value from the given observable without blocking a thread.
127 * Returns the resulting value or throws the corresponding exception if this observable had produced error.
128 *
129 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +0300130 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizarov331750b2017-02-15 17:59:17 +0300131 * immediately resumes with [CancellationException].
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300132 *
133 * @throws NoSuchElementException if observable does not emit any value
Roman Elizarov331750b2017-02-15 17:59:17 +0300134 */
135public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST)
136
137/**
138 * Awaits for the single value from the given observable without blocking a thread.
139 * Returns the resulting value or throws the corresponding exception if this observable had produced error.
140 *
141 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +0300142 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizarov331750b2017-02-15 17:59:17 +0300143 * immediately resumes with [CancellationException].
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300144 *
145 * @throws NoSuchElementException if observable does not emit any value
146 * @throws IllegalArgumentException if observable emits more than one value
Roman Elizarov331750b2017-02-15 17:59:17 +0300147 */
148public suspend fun <T> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
149
150// ------------------------ private ------------------------
151
Roman Elizarov3e9f2442018-04-28 17:38:22 +0300152internal fun CancellableContinuation<*>.disposeOnCancellation(d: Disposable) =
Vsevolod Tolstopyatov80a29472018-04-17 16:02:02 +0300153 invokeOnCancellation { d.dispose() }
Roman Elizarov331750b2017-02-15 17:59:17 +0300154
155private enum class Mode(val s: String) {
156 FIRST("awaitFirst"),
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300157 FIRST_OR_DEFAULT("awaitFirstOrDefault"),
Roman Elizarov331750b2017-02-15 17:59:17 +0300158 LAST("awaitLast"),
159 SINGLE("awaitSingle");
160 override fun toString(): String = s
161}
162
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300163private suspend fun <T> ObservableSource<T>.awaitOne(
164 mode: Mode,
165 default: T? = null
166): T = suspendCancellableCoroutine { cont ->
Roman Elizarov331750b2017-02-15 17:59:17 +0300167 subscribe(object : Observer<T> {
168 private lateinit var subscription: Disposable
169 private var value: T? = null
170 private var seenValue = false
171
172 override fun onSubscribe(sub: Disposable) {
173 subscription = sub
Vsevolod Tolstopyatov80a29472018-04-17 16:02:02 +0300174 cont.invokeOnCancellation { sub.dispose() }
Roman Elizarov331750b2017-02-15 17:59:17 +0300175 }
176
177 override fun onNext(t: T) {
178 when (mode) {
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300179 Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
Roman Elizarov1b9cbf42017-07-12 12:43:24 +0300180 if (!seenValue) {
181 seenValue = true
182 cont.resume(t)
183 subscription.dispose()
184 }
Roman Elizarov331750b2017-02-15 17:59:17 +0300185 }
186 Mode.LAST, Mode.SINGLE -> {
187 if (mode == Mode.SINGLE && seenValue) {
188 if (cont.isActive)
189 cont.resumeWithException(IllegalArgumentException("More that one onNext value for $mode"))
190 subscription.dispose()
191 } else {
192 value = t
193 seenValue = true
194 }
195 }
196 }
197 }
198
Roman Elizarove233bf92017-07-12 13:00:43 +0300199 @Suppress("UNCHECKED_CAST")
Roman Elizarov331750b2017-02-15 17:59:17 +0300200 override fun onComplete() {
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300201 if (seenValue) {
202 if (cont.isActive) cont.resume(value as T)
Roman Elizarov331750b2017-02-15 17:59:17 +0300203 return
204 }
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300205 when {
206 mode == Mode.FIRST_OR_DEFAULT -> {
207 cont.resume(default as T)
208 }
209 cont.isActive -> {
210 cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
211 }
212 }
Roman Elizarov331750b2017-02-15 17:59:17 +0300213 }
214
215 override fun onError(e: Throwable) {
216 cont.resumeWithException(e)
217 }
218 })
219}
220