blob: 940a6218c6bf7e6e5faecdc14c753e2a302d8e68 [file] [log] [blame]
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03001/*
2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
Konrad Kamiński3ae898c2017-03-30 17:37:00 +02005package kotlinx.coroutines.experimental.reactor
6
Francesco Vascoe19ee042017-11-29 22:12:35 +01007import kotlinx.coroutines.experimental.DefaultDispatcher
Konrad Kamiński3ae898c2017-03-30 17:37:00 +02008import kotlinx.coroutines.experimental.Deferred
9import kotlinx.coroutines.experimental.Job
10import kotlinx.coroutines.experimental.channels.ReceiveChannel
11import reactor.core.publisher.Flux
12import reactor.core.publisher.Mono
13import kotlin.coroutines.experimental.CoroutineContext
14
15/**
16 * Converts this job to the hot reactive mono that signals
17 * with [success][MonoSink.success] when the corresponding job completes.
18 *
19 * Every subscriber gets the signal at the same time.
20 * Unsubscribing from the resulting mono **does not** affect the original job in any way.
21 *
22 * @param context -- the coroutine context from which the resulting mono is going to be signalled
23 */
Francesco Vascoe19ee042017-11-29 22:12:35 +010024public fun Job.asMono(context: CoroutineContext = DefaultDispatcher): Mono<Unit> = mono(context) { this@asMono.join() }
Konrad Kamiński3ae898c2017-03-30 17:37:00 +020025
26/**
27 * Converts this deferred value to the hot reactive mono that signals
28 * [success][MonoSink.success] or [error][MonoSink.error].
29 *
30 * Every subscriber gets the same completion value.
31 * Unsubscribing from the resulting mono **does not** affect the original deferred value in any way.
32 *
33 * @param context -- the coroutine context from which the resulting mono is going to be signalled
34 */
Francesco Vascoe19ee042017-11-29 22:12:35 +010035public fun <T> Deferred<T?>.asMono(context: CoroutineContext = DefaultDispatcher): Mono<T> = mono(context) { this@asMono.await() }
Konrad Kamiński3ae898c2017-03-30 17:37:00 +020036
37/**
38 * Converts a stream of elements received from the channel to the hot reactive flux.
39 *
40 * Every subscriber receives values from this channel in **fan-out** fashion. If the are multiple subscribers,
41 * they'll receive values in round-robin way.
42 *
43 * @param context -- the coroutine context from which the resulting flux is going to be signalled
44 */
Francesco Vascoe19ee042017-11-29 22:12:35 +010045public fun <T> ReceiveChannel<T>.asFlux(context: CoroutineContext = DefaultDispatcher): Flux<T> = flux(context) {
Konrad Kamiński3ae898c2017-03-30 17:37:00 +020046 for (t in this@asFlux)
47 send(t)
48}