blob: 7956c26010580a98a0d831a695272e415be4293a [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
Vsevolod Tolstopyatov6d1a6e32020-02-18 15:28:00 +03002 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov331750b2017-02-15 17:59:17 +03003 */
4
Roman Elizarov0950dfa2018-07-13 10:33:25 +03005package kotlinx.coroutines.reactive
Roman Elizarov331750b2017-02-15 17:59:17 +03006
Roman Elizarov0950dfa2018-07-13 10:33:25 +03007import kotlinx.coroutines.CancellationException
8import kotlinx.coroutines.Job
9import kotlinx.coroutines.suspendCancellableCoroutine
Roman Elizarov331750b2017-02-15 17:59:17 +030010import org.reactivestreams.Publisher
11import org.reactivestreams.Subscriber
12import org.reactivestreams.Subscription
SokolovaMaria1dcfd972019-08-09 17:35:14 +030013import java.util.*
Roman Elizarov0950dfa2018-07-13 10:33:25 +030014import kotlin.coroutines.*
Roman Elizarov331750b2017-02-15 17:59:17 +030015
16/**
17 * Awaits for the first value from the given publisher without blocking a thread and
18 * returns the resulting value or throws the corresponding exception if this publisher had produced error.
19 *
20 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +030021 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizarov331750b2017-02-15 17:59:17 +030022 * immediately resumes with [CancellationException].
Roman Elizarov4a67afb2017-03-16 11:10:47 +030023 *
24 * @throws NoSuchElementException if publisher does not emit any value
Roman Elizarov331750b2017-02-15 17:59:17 +030025 */
26public suspend fun <T> Publisher<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
27
28/**
Roman Elizarov4a67afb2017-03-16 11:10:47 +030029 * Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
30 * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
31 *
32 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +030033 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizarov4a67afb2017-03-16 11:10:47 +030034 * immediately resumes with [CancellationException].
35 */
36public suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
37
38/**
Konrad Kamińskib8ed47c2018-02-06 12:17:53 +010039 * Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
40 * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
41 *
42 * This suspending function is cancellable.
43 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
44 * immediately resumes with [CancellationException].
45 */
46public suspend fun <T> Publisher<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
47
48/**
49 * Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
50 * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
51 *
52 * This suspending function is cancellable.
53 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
54 * immediately resumes with [CancellationException].
55 */
56public suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
57
58/**
Roman Elizarov331750b2017-02-15 17:59:17 +030059 * Awaits for the last value from the given publisher without blocking a thread and
60 * returns the resulting value or throws the corresponding exception if this publisher had produced error.
61 *
62 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +030063 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizarov331750b2017-02-15 17:59:17 +030064 * immediately resumes with [CancellationException].
Roman Elizarov4a67afb2017-03-16 11:10:47 +030065 *
66 * @throws NoSuchElementException if publisher does not emit any value
Roman Elizarov331750b2017-02-15 17:59:17 +030067 */
68public suspend fun <T> Publisher<T>.awaitLast(): T = awaitOne(Mode.LAST)
69
70/**
71 * Awaits for the single value from the given publisher without blocking a thread and
72 * returns the resulting value or throws the corresponding exception if this publisher had produced error.
73 *
74 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +030075 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizarov331750b2017-02-15 17:59:17 +030076 * immediately resumes with [CancellationException].
Roman Elizarov4a67afb2017-03-16 11:10:47 +030077 *
78 * @throws NoSuchElementException if publisher does not emit any value
79 * @throws IllegalArgumentException if publisher emits more than one value
Roman Elizarov331750b2017-02-15 17:59:17 +030080 */
81public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
82
Sébastien Deleuze5dd94a32020-09-22 17:18:53 +020083/**
84 * Awaits for the single value from the given publisher or the [default] value if none is emitted without blocking a thread and
85 * returns the resulting value or throws the corresponding exception if this publisher had produced error.
86 *
87 * This suspending function is cancellable.
88 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
89 * immediately resumes with [CancellationException].
90 *
91 * @throws NoSuchElementException if publisher does not emit any value
92 * @throws IllegalArgumentException if publisher emits more than one value
93 */
94public suspend fun <T> Publisher<T>.awaitSingleOrDefault(default: T): T = awaitOne(Mode.SINGLE_OR_DEFAULT, default)
95
96/**
97 * Awaits for the single value from the given publisher or `null` value if none is emitted without blocking a thread and
98 * returns the resulting value or throws the corresponding exception if this publisher had produced error.
99 *
100 * This suspending function is cancellable.
101 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
102 * immediately resumes with [CancellationException].
103 *
104 * @throws NoSuchElementException if publisher does not emit any value
105 * @throws IllegalArgumentException if publisher emits more than one value
106 */
107public suspend fun <T> Publisher<T>.awaitSingleOrNull(): T = awaitOne(Mode.SINGLE_OR_DEFAULT)
108
109/**
110 * Awaits for the single value from the given publisher or call [defaultValue] to get a value if none is emitted without blocking a thread and
111 * returns the resulting value or throws the corresponding exception if this publisher had produced error.
112 *
113 * This suspending function is cancellable.
114 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
115 * immediately resumes with [CancellationException].
116 *
117 * @throws NoSuchElementException if publisher does not emit any value
118 * @throws IllegalArgumentException if publisher emits more than one value
119 */
120public suspend fun <T> Publisher<T>.awaitSingleOrElse(defaultValue: () -> T): T = awaitOne(Mode.SINGLE_OR_DEFAULT) ?: defaultValue()
121
Roman Elizarov331750b2017-02-15 17:59:17 +0300122// ------------------------ private ------------------------
123
124private enum class Mode(val s: String) {
125 FIRST("awaitFirst"),
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300126 FIRST_OR_DEFAULT("awaitFirstOrDefault"),
Roman Elizarov331750b2017-02-15 17:59:17 +0300127 LAST("awaitLast"),
Sébastien Deleuze5dd94a32020-09-22 17:18:53 +0200128 SINGLE("awaitSingle"),
129 SINGLE_OR_DEFAULT("awaitSingleOrDefault");
Roman Elizarov331750b2017-02-15 17:59:17 +0300130 override fun toString(): String = s
131}
132
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300133private suspend fun <T> Publisher<T>.awaitOne(
134 mode: Mode,
135 default: T? = null
136): T = suspendCancellableCoroutine { cont ->
SokolovaMaria1dcfd972019-08-09 17:35:14 +0300137 injectCoroutineContext(cont.context).subscribe(object : Subscriber<T> {
Roman Elizarov331750b2017-02-15 17:59:17 +0300138 private lateinit var subscription: Subscription
139 private var value: T? = null
140 private var seenValue = false
141
142 override fun onSubscribe(sub: Subscription) {
143 subscription = sub
Vsevolod Tolstopyatov80a29472018-04-17 16:02:02 +0300144 cont.invokeOnCancellation { sub.cancel() }
Roman Elizarov331750b2017-02-15 17:59:17 +0300145 sub.request(if (mode == Mode.FIRST) 1 else Long.MAX_VALUE)
146 }
147
148 override fun onNext(t: T) {
149 when (mode) {
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300150 Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
Roman Elizarov1b9cbf42017-07-12 12:43:24 +0300151 if (!seenValue) {
152 seenValue = true
Roman Elizarov1b9cbf42017-07-12 12:43:24 +0300153 subscription.cancel()
Roman Elizarov3e9f2442018-04-28 17:38:22 +0300154 cont.resume(t)
Roman Elizarov1b9cbf42017-07-12 12:43:24 +0300155 }
Roman Elizarov331750b2017-02-15 17:59:17 +0300156 }
Sébastien Deleuze5dd94a32020-09-22 17:18:53 +0200157 Mode.LAST, Mode.SINGLE, Mode.SINGLE_OR_DEFAULT -> {
158 if ((mode == Mode.SINGLE || mode == Mode.SINGLE_OR_DEFAULT) && seenValue) {
Roman Elizarov3e9f2442018-04-28 17:38:22 +0300159 subscription.cancel()
Roman Elizarov331750b2017-02-15 17:59:17 +0300160 if (cont.isActive)
Vsevolod Tolstopyatov85c7f4d2019-02-19 15:55:24 +0300161 cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
Roman Elizarov331750b2017-02-15 17:59:17 +0300162 } else {
163 value = t
164 seenValue = true
165 }
166 }
167 }
168 }
169
Roman Elizarov87142a22017-07-21 19:08:45 +0300170 @Suppress("UNCHECKED_CAST")
Roman Elizarov331750b2017-02-15 17:59:17 +0300171 override fun onComplete() {
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300172 if (seenValue) {
173 if (cont.isActive) cont.resume(value as T)
Roman Elizarov331750b2017-02-15 17:59:17 +0300174 return
175 }
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300176 when {
Sébastien Deleuze5dd94a32020-09-22 17:18:53 +0200177 (mode == Mode.FIRST_OR_DEFAULT || mode == Mode.SINGLE_OR_DEFAULT) -> {
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300178 cont.resume(default as T)
179 }
180 cont.isActive -> {
181 cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
182 }
183 }
Roman Elizarov331750b2017-02-15 17:59:17 +0300184 }
185
186 override fun onError(e: Throwable) {
187 cont.resumeWithException(e)
188 }
189 })
190}
191