blob: f828c0d6a651bd2614edca0885a15bc9b4a005df [file] [log] [blame]
package kotlinx.coroutines.experimental.reactor
import kotlinx.coroutines.experimental.DefaultDispatcher
import kotlinx.coroutines.experimental.channels.ProducerScope
import kotlinx.coroutines.experimental.reactive.publish
import reactor.core.publisher.Flux
import kotlin.coroutines.experimental.CoroutineContext
/**
* Creates cold reactive [Flux] that runs a given [block] in a coroutine.
* Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
* Coroutine emits items with `send`. Unsubscribing cancels running coroutine.
*
* Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
* `onNext` is not invoked concurrently.
*
* | **Coroutine action** | **Signal to subscriber**
* | -------------------------------------------- | ------------------------
* | `send` | `onNext`
* | Normal completion or `close` without cause | `onComplete`
* | Failure with exception or `close` with cause | `onError`
*/
@JvmOverloads // for binary compatibility with older code compiled before context had a default
fun <T> flux(
context: CoroutineContext = DefaultDispatcher,
block: suspend ProducerScope<T>.() -> Unit
): Flux<T> = Flux.from(publish(context, block = block))