blob: f828c0d6a651bd2614edca0885a15bc9b4a005df [file] [log] [blame]
Konrad Kamiński3ae898c2017-03-30 17:37:00 +02001package kotlinx.coroutines.experimental.reactor
2
Francesco Vascoe19ee042017-11-29 22:12:35 +01003import kotlinx.coroutines.experimental.DefaultDispatcher
Konrad Kamiński3ae898c2017-03-30 17:37:00 +02004import kotlinx.coroutines.experimental.channels.ProducerScope
5import kotlinx.coroutines.experimental.reactive.publish
6import reactor.core.publisher.Flux
7import kotlin.coroutines.experimental.CoroutineContext
8
9/**
10 * Creates cold reactive [Flux] that runs a given [block] in a coroutine.
11 * Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
12 * Coroutine emits items with `send`. Unsubscribing cancels running coroutine.
13 *
14 * Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
15 * `onNext` is not invoked concurrently.
16 *
17 * | **Coroutine action** | **Signal to subscriber**
18 * | -------------------------------------------- | ------------------------
19 * | `send` | `onNext`
20 * | Normal completion or `close` without cause | `onComplete`
21 * | Failure with exception or `close` with cause | `onError`
22 */
Francesco Vascoe19ee042017-11-29 22:12:35 +010023@JvmOverloads // for binary compatibility with older code compiled before context had a default
Konrad Kamiński3ae898c2017-03-30 17:37:00 +020024fun <T> flux(
Francesco Vascoe19ee042017-11-29 22:12:35 +010025 context: CoroutineContext = DefaultDispatcher,
Konrad Kamiński3ae898c2017-03-30 17:37:00 +020026 block: suspend ProducerScope<T>.() -> Unit
Roman Elizarove8f694e2017-11-28 10:12:00 +030027): Flux<T> = Flux.from(publish(context, block = block))