blob: 0feef612fcf8dd5f5e62f839065826981fe3dbf2 [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kotlinx.coroutines.experimental.reactive
18
19import kotlinx.coroutines.experimental.CancellationException
20import kotlinx.coroutines.experimental.Job
21import kotlinx.coroutines.experimental.suspendCancellableCoroutine
22import org.reactivestreams.Publisher
23import org.reactivestreams.Subscriber
24import org.reactivestreams.Subscription
25
26/**
27 * Awaits for the first value from the given publisher without blocking a thread and
28 * returns the resulting value or throws the corresponding exception if this publisher had produced error.
29 *
30 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +030031 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizarov331750b2017-02-15 17:59:17 +030032 * immediately resumes with [CancellationException].
Roman Elizarov4a67afb2017-03-16 11:10:47 +030033 *
34 * @throws NoSuchElementException if publisher does not emit any value
Roman Elizarov331750b2017-02-15 17:59:17 +030035 */
36public suspend fun <T> Publisher<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
37
38/**
Roman Elizarov4a67afb2017-03-16 11:10:47 +030039 * Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
40 * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
41 *
42 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +030043 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizarov4a67afb2017-03-16 11:10:47 +030044 * immediately resumes with [CancellationException].
45 */
46public suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
47
48/**
Konrad KamiƄskib8ed47c2018-02-06 12:17:53 +010049 * Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
50 * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
51 *
52 * This suspending function is cancellable.
53 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
54 * immediately resumes with [CancellationException].
55 */
56public suspend fun <T> Publisher<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
57
58/**
59 * Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
60 * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
61 *
62 * This suspending function is cancellable.
63 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
64 * immediately resumes with [CancellationException].
65 */
66public suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
67
68/**
Roman Elizarov331750b2017-02-15 17:59:17 +030069 * Awaits for the last value from the given publisher without blocking a thread and
70 * returns the resulting value or throws the corresponding exception if this publisher had produced error.
71 *
72 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +030073 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizarov331750b2017-02-15 17:59:17 +030074 * immediately resumes with [CancellationException].
Roman Elizarov4a67afb2017-03-16 11:10:47 +030075 *
76 * @throws NoSuchElementException if publisher does not emit any value
Roman Elizarov331750b2017-02-15 17:59:17 +030077 */
78public suspend fun <T> Publisher<T>.awaitLast(): T = awaitOne(Mode.LAST)
79
80/**
81 * Awaits for the single value from the given publisher without blocking a thread and
82 * returns the resulting value or throws the corresponding exception if this publisher had produced error.
83 *
84 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +030085 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizarov331750b2017-02-15 17:59:17 +030086 * immediately resumes with [CancellationException].
Roman Elizarov4a67afb2017-03-16 11:10:47 +030087 *
88 * @throws NoSuchElementException if publisher does not emit any value
89 * @throws IllegalArgumentException if publisher emits more than one value
Roman Elizarov331750b2017-02-15 17:59:17 +030090 */
91public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
92
93// ------------------------ private ------------------------
94
95private enum class Mode(val s: String) {
96 FIRST("awaitFirst"),
Roman Elizarov4a67afb2017-03-16 11:10:47 +030097 FIRST_OR_DEFAULT("awaitFirstOrDefault"),
Roman Elizarov331750b2017-02-15 17:59:17 +030098 LAST("awaitLast"),
99 SINGLE("awaitSingle");
100 override fun toString(): String = s
101}
102
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300103private suspend fun <T> Publisher<T>.awaitOne(
104 mode: Mode,
105 default: T? = null
106): T = suspendCancellableCoroutine { cont ->
Roman Elizarov331750b2017-02-15 17:59:17 +0300107 subscribe(object : Subscriber<T> {
108 private lateinit var subscription: Subscription
109 private var value: T? = null
110 private var seenValue = false
111
112 override fun onSubscribe(sub: Subscription) {
113 subscription = sub
Vsevolod Tolstopyatov80a29472018-04-17 16:02:02 +0300114 cont.invokeOnCancellation { sub.cancel() }
Roman Elizarov331750b2017-02-15 17:59:17 +0300115 sub.request(if (mode == Mode.FIRST) 1 else Long.MAX_VALUE)
116 }
117
118 override fun onNext(t: T) {
119 when (mode) {
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300120 Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
Roman Elizarov1b9cbf42017-07-12 12:43:24 +0300121 if (!seenValue) {
122 seenValue = true
Roman Elizarov1b9cbf42017-07-12 12:43:24 +0300123 subscription.cancel()
Roman Elizarov3e9f2442018-04-28 17:38:22 +0300124 cont.resume(t)
Roman Elizarov1b9cbf42017-07-12 12:43:24 +0300125 }
Roman Elizarov331750b2017-02-15 17:59:17 +0300126 }
127 Mode.LAST, Mode.SINGLE -> {
128 if (mode == Mode.SINGLE && seenValue) {
Roman Elizarov3e9f2442018-04-28 17:38:22 +0300129 subscription.cancel()
Roman Elizarov331750b2017-02-15 17:59:17 +0300130 if (cont.isActive)
131 cont.resumeWithException(IllegalArgumentException("More that one onNext value for $mode"))
Roman Elizarov331750b2017-02-15 17:59:17 +0300132 } else {
133 value = t
134 seenValue = true
135 }
136 }
137 }
138 }
139
Roman Elizarov87142a22017-07-21 19:08:45 +0300140 @Suppress("UNCHECKED_CAST")
Roman Elizarov331750b2017-02-15 17:59:17 +0300141 override fun onComplete() {
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300142 if (seenValue) {
143 if (cont.isActive) cont.resume(value as T)
Roman Elizarov331750b2017-02-15 17:59:17 +0300144 return
145 }
Roman Elizarov4a67afb2017-03-16 11:10:47 +0300146 when {
147 mode == Mode.FIRST_OR_DEFAULT -> {
148 cont.resume(default as T)
149 }
150 cont.isActive -> {
151 cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
152 }
153 }
Roman Elizarov331750b2017-02-15 17:59:17 +0300154 }
155
156 override fun onError(e: Throwable) {
157 cont.resumeWithException(e)
158 }
159 })
160}
161