blob: d12a6280eb1229d099bbc449f7d1032f2f84b4c3 [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov331750b2017-02-15 17:59:17 +03003 */
4
Roman Elizarov0950dfa2018-07-13 10:33:25 +03005package kotlinx.coroutines.reactive
Roman Elizarov331750b2017-02-15 17:59:17 +03006
Roman Elizarov0950dfa2018-07-13 10:33:25 +03007import kotlinx.coroutines.CancellationException
8import kotlinx.coroutines.Job
9import kotlinx.coroutines.suspendCancellableCoroutine
Roman Elizarov331750b2017-02-15 17:59:17 +030010import org.reactivestreams.Publisher
11import org.reactivestreams.Subscriber
12import org.reactivestreams.Subscription
Roman Elizarov0950dfa2018-07-13 10:33:25 +030013import kotlin.coroutines.*
Roman Elizarov331750b2017-02-15 17:59:17 +030014
15/**
16 * Awaits for the first value from the given publisher without blocking a thread and
17 * returns the resulting value or throws the corresponding exception if this publisher had produced error.
18 *
19 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +030020 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizarov331750b2017-02-15 17:59:17 +030021 * immediately resumes with [CancellationException].
Roman Elizarov4a67afb2017-03-16 11:10:47 +030022 *
23 * @throws NoSuchElementException if publisher does not emit any value
Roman Elizarov331750b2017-02-15 17:59:17 +030024 */
25public suspend fun <T> Publisher<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
26
27/**
Roman Elizarov4a67afb2017-03-16 11:10:47 +030028 * Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
29 * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
30 *
31 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +030032 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizarov4a67afb2017-03-16 11:10:47 +030033 * immediately resumes with [CancellationException].
34 */
35public suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
36
37/**
Konrad KamiƄskib8ed47c2018-02-06 12:17:53 +010038 * Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
39 * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
40 *
41 * This suspending function is cancellable.
42 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
43 * immediately resumes with [CancellationException].
44 */
45public suspend fun <T> Publisher<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
46
47/**
48 * Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
49 * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
50 *
51 * This suspending function is cancellable.
52 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
53 * immediately resumes with [CancellationException].
54 */
55public suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
56
57/**
Roman Elizarov331750b2017-02-15 17:59:17 +030058 * Awaits for the last value from the given publisher without blocking a thread and
59 * returns the resulting value or throws the corresponding exception if this publisher had produced error.
60 *
61 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +030062 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizarov331750b2017-02-15 17:59:17 +030063 * immediately resumes with [CancellationException].
Roman Elizarov4a67afb2017-03-16 11:10:47 +030064 *
65 * @throws NoSuchElementException if publisher does not emit any value
Roman Elizarov331750b2017-02-15 17:59:17 +030066 */
67public suspend fun <T> Publisher<T>.awaitLast(): T = awaitOne(Mode.LAST)
68
69/**
70 * Awaits for the single value from the given publisher without blocking a thread and
71 * returns the resulting value or throws the corresponding exception if this publisher had produced error.
72 *
73 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +030074 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizarov331750b2017-02-15 17:59:17 +030075 * immediately resumes with [CancellationException].
Roman Elizarov4a67afb2017-03-16 11:10:47 +030076 *
77 * @throws NoSuchElementException if publisher does not emit any value
78 * @throws IllegalArgumentException if publisher emits more than one value
Roman Elizarov331750b2017-02-15 17:59:17 +030079 */
80public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
81
82// ------------------------ private ------------------------
83
84private enum class Mode(val s: String) {
85 FIRST("awaitFirst"),
Roman Elizarov4a67afb2017-03-16 11:10:47 +030086 FIRST_OR_DEFAULT("awaitFirstOrDefault"),
Roman Elizarov331750b2017-02-15 17:59:17 +030087 LAST("awaitLast"),
88 SINGLE("awaitSingle");
89 override fun toString(): String = s
90}
91
Roman Elizarov4a67afb2017-03-16 11:10:47 +030092private suspend fun <T> Publisher<T>.awaitOne(
93 mode: Mode,
94 default: T? = null
95): T = suspendCancellableCoroutine { cont ->
Roman Elizarov331750b2017-02-15 17:59:17 +030096 subscribe(object : Subscriber<T> {
97 private lateinit var subscription: Subscription
98 private var value: T? = null
99 private var seenValue = false
100
101 override fun onSubscribe(sub: Subscription) {
102 subscription = sub
Vsevolod Tolstopyatov80a29472018-04-17 16:02:02 +0300103 cont.invokeOnCancellation { sub.cancel() }
Roman Elizarov331750b2017-02-15 17:59:17 +0300104 sub.request(if (mode == Mode.FIRST) 1 else Long.MAX_VALUE)
105 }
106
107 override fun onNext(t: T) {
108 when (mode) {
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300109 Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
Roman Elizarov1b9cbf42017-07-12 12:43:24 +0300110 if (!seenValue) {
111 seenValue = true
Roman Elizarov1b9cbf42017-07-12 12:43:24 +0300112 subscription.cancel()
Roman Elizarov3e9f2442018-04-28 17:38:22 +0300113 cont.resume(t)
Roman Elizarov1b9cbf42017-07-12 12:43:24 +0300114 }
Roman Elizarov331750b2017-02-15 17:59:17 +0300115 }
116 Mode.LAST, Mode.SINGLE -> {
117 if (mode == Mode.SINGLE && seenValue) {
Roman Elizarov3e9f2442018-04-28 17:38:22 +0300118 subscription.cancel()
Roman Elizarov331750b2017-02-15 17:59:17 +0300119 if (cont.isActive)
Vsevolod Tolstopyatov85c7f4d2019-02-19 15:55:24 +0300120 cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
Roman Elizarov331750b2017-02-15 17:59:17 +0300121 } else {
122 value = t
123 seenValue = true
124 }
125 }
126 }
127 }
128
Roman Elizarov87142a22017-07-21 19:08:45 +0300129 @Suppress("UNCHECKED_CAST")
Roman Elizarov331750b2017-02-15 17:59:17 +0300130 override fun onComplete() {
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300131 if (seenValue) {
132 if (cont.isActive) cont.resume(value as T)
Roman Elizarov331750b2017-02-15 17:59:17 +0300133 return
134 }
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300135 when {
136 mode == Mode.FIRST_OR_DEFAULT -> {
137 cont.resume(default as T)
138 }
139 cont.isActive -> {
140 cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
141 }
142 }
Roman Elizarov331750b2017-02-15 17:59:17 +0300143 }
144
145 override fun onError(e: Throwable) {
146 cont.resumeWithException(e)
147 }
148 })
149}
150