blob: 9eeee9ceaaf6edfe5a378f9c29664c41837c7f64 [file] [log] [blame]
Konrad Kamiński3ae898c2017-03-30 17:37:00 +02001package kotlinx.coroutines.experimental.reactor
2
Francesco Vascoe19ee042017-11-29 22:12:35 +01003import kotlinx.coroutines.experimental.DefaultDispatcher
Konrad Kamiński3ae898c2017-03-30 17:37:00 +02004import kotlinx.coroutines.experimental.Deferred
5import kotlinx.coroutines.experimental.Job
6import kotlinx.coroutines.experimental.channels.ReceiveChannel
7import reactor.core.publisher.Flux
8import reactor.core.publisher.Mono
9import kotlin.coroutines.experimental.CoroutineContext
10
11/**
12 * Converts this job to the hot reactive mono that signals
13 * with [success][MonoSink.success] when the corresponding job completes.
14 *
15 * Every subscriber gets the signal at the same time.
16 * Unsubscribing from the resulting mono **does not** affect the original job in any way.
17 *
18 * @param context -- the coroutine context from which the resulting mono is going to be signalled
19 */
Francesco Vascoe19ee042017-11-29 22:12:35 +010020public fun Job.asMono(context: CoroutineContext = DefaultDispatcher): Mono<Unit> = mono(context) { this@asMono.join() }
Konrad Kamiński3ae898c2017-03-30 17:37:00 +020021
22/**
23 * Converts this deferred value to the hot reactive mono that signals
24 * [success][MonoSink.success] or [error][MonoSink.error].
25 *
26 * Every subscriber gets the same completion value.
27 * Unsubscribing from the resulting mono **does not** affect the original deferred value in any way.
28 *
29 * @param context -- the coroutine context from which the resulting mono is going to be signalled
30 */
Francesco Vascoe19ee042017-11-29 22:12:35 +010031public fun <T> Deferred<T?>.asMono(context: CoroutineContext = DefaultDispatcher): Mono<T> = mono(context) { this@asMono.await() }
Konrad Kamiński3ae898c2017-03-30 17:37:00 +020032
33/**
34 * Converts a stream of elements received from the channel to the hot reactive flux.
35 *
36 * Every subscriber receives values from this channel in **fan-out** fashion. If the are multiple subscribers,
37 * they'll receive values in round-robin way.
38 *
39 * @param context -- the coroutine context from which the resulting flux is going to be signalled
40 */
Francesco Vascoe19ee042017-11-29 22:12:35 +010041public fun <T> ReceiveChannel<T>.asFlux(context: CoroutineContext = DefaultDispatcher): Flux<T> = flux(context) {
Konrad Kamiński3ae898c2017-03-30 17:37:00 +020042 for (t in this@asFlux)
43 send(t)
44}