blob: fb097ef58273aba434812adaf7b02dead435d67d [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov331750b2017-02-15 17:59:17 +03003 */
4
5package kotlinx.coroutines.experimental.rx1
6
7import kotlinx.coroutines.experimental.CancellableContinuation
8import kotlinx.coroutines.experimental.CancellationException
9import kotlinx.coroutines.experimental.Job
10import kotlinx.coroutines.experimental.suspendCancellableCoroutine
11import rx.*
12
Konrad Kamiński804d0362017-04-07 09:22:58 +020013// ------------------------ Completable ------------------------
14
15/**
16 * Awaits for completion of this completable without blocking a thread.
17 * Returns `Unit` or throws the corresponding exception if this completable had produced error.
18 *
Roman Elizarovd82b3a92017-06-23 21:52:08 +030019 * This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this
Konrad Kamiński804d0362017-04-07 09:22:58 +020020 * suspending function is suspended, this function immediately resumes with [CancellationException].
21 */
22public suspend fun Completable.awaitCompleted(): Unit = suspendCancellableCoroutine { cont ->
23 subscribe(object : CompletableSubscriber {
Roman Elizarov3e9f2442018-04-28 17:38:22 +030024 override fun onSubscribe(s: Subscription) { cont.unsubscribeOnCancellation(s) }
Konrad Kamiński804d0362017-04-07 09:22:58 +020025 override fun onCompleted() { cont.resume(Unit) }
26 override fun onError(e: Throwable) { cont.resumeWithException(e) }
27 })
28}
29
Roman Elizarov331750b2017-02-15 17:59:17 +030030// ------------------------ Single ------------------------
31
32/**
33 * Awaits for completion of the single value without blocking a thread and
34 * returns the resulting value or throws the corresponding exception if this single had produced error.
35 *
36 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +030037 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizarov331750b2017-02-15 17:59:17 +030038 * immediately resumes with [CancellationException].
39 */
40public suspend fun <T> Single<T>.await(): T = suspendCancellableCoroutine { cont ->
Roman Elizarov3e9f2442018-04-28 17:38:22 +030041 cont.unsubscribeOnCancellation(subscribe(object : SingleSubscriber<T>() {
Roman Elizarov331750b2017-02-15 17:59:17 +030042 override fun onSuccess(t: T) { cont.resume(t) }
43 override fun onError(error: Throwable) { cont.resumeWithException(error) }
44 }))
45}
46
47// ------------------------ Observable ------------------------
48
49/**
50 * Awaits for the first value from the given observable without blocking a thread and
51 * returns the resulting value or throws the corresponding exception if this observable had produced error.
52 *
53 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +030054 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizarov331750b2017-02-15 17:59:17 +030055 * immediately resumes with [CancellationException].
Roman Elizarov4a67afb2017-03-16 11:10:47 +030056 *
57 * @throws NoSuchElementException if observable does not emit any value
Roman Elizarov331750b2017-02-15 17:59:17 +030058 */
59public suspend fun <T> Observable<T>.awaitFirst(): T = first().awaitOne()
60
61/**
Kirill Rakhmanff4c7c32017-03-15 13:19:56 +010062 * Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
63 * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
64 *
65 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +030066 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Kirill Rakhmanff4c7c32017-03-15 13:19:56 +010067 * immediately resumes with [CancellationException].
68 */
69public suspend fun <T> Observable<T>.awaitFirstOrDefault(default: T): T = firstOrDefault(default).awaitOne()
70
71/**
Konrad Kamińskib8ed47c2018-02-06 12:17:53 +010072 * Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
73 * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
74 *
75 * This suspending function is cancellable.
76 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
77 * immediately resumes with [CancellationException].
78 */
79public suspend fun <T> Observable<T>.awaitFirstOrNull(): T? = firstOrDefault(null).awaitOne()
80
81/**
82 * Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
83 * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
84 *
85 * This suspending function is cancellable.
86 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
87 * immediately resumes with [CancellationException].
88 */
89public suspend fun <T> Observable<T>.awaitFirstOrElse(defaultValue: () -> T): T = switchIfEmpty(Observable.fromCallable(defaultValue)).first().awaitOne()
90
91/**
Roman Elizarov331750b2017-02-15 17:59:17 +030092 * Awaits for the last value from the given observable without blocking a thread and
93 * returns the resulting value or throws the corresponding exception if this observable had produced error.
94 *
95 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +030096 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizarov331750b2017-02-15 17:59:17 +030097 * immediately resumes with [CancellationException].
Roman Elizarov4a67afb2017-03-16 11:10:47 +030098 *
99 * @throws NoSuchElementException if observable does not emit any value
Roman Elizarov331750b2017-02-15 17:59:17 +0300100 */
101public suspend fun <T> Observable<T>.awaitLast(): T = last().awaitOne()
102
103/**
104 * Awaits for the single value from the given observable without blocking a thread and
105 * returns the resulting value or throws the corresponding exception if this observable had produced error.
106 *
107 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +0300108 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizarov331750b2017-02-15 17:59:17 +0300109 * immediately resumes with [CancellationException].
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300110 *
111 * @throws NoSuchElementException if observable does not emit any value
112 * @throws IllegalArgumentException if publisher emits more than one value
Roman Elizarov331750b2017-02-15 17:59:17 +0300113 */
114public suspend fun <T> Observable<T>.awaitSingle(): T = single().awaitOne()
115
116// ------------------------ private ------------------------
117
118private suspend fun <T> Observable<T>.awaitOne(): T = suspendCancellableCoroutine { cont ->
Roman Elizarov3e9f2442018-04-28 17:38:22 +0300119 cont.unsubscribeOnCancellation(subscribe(object : Subscriber<T>() {
Roman Elizarov331750b2017-02-15 17:59:17 +0300120 override fun onStart() { request(1) }
121 override fun onNext(t: T) { cont.resume(t) }
122 override fun onCompleted() { if (cont.isActive) cont.resumeWithException(IllegalStateException("Should have invoked onNext")) }
123 override fun onError(e: Throwable) { cont.resumeWithException(e) }
124 }))
125}
126
Roman Elizarov3e9f2442018-04-28 17:38:22 +0300127internal fun <T> CancellableContinuation<T>.unsubscribeOnCancellation(sub: Subscription) =
Vsevolod Tolstopyatov80a29472018-04-17 16:02:02 +0300128 invokeOnCancellation { sub.unsubscribe() }