blob: 7c6182bf5389c190ceb7c2b975205904e634453c [file] [log] [blame]
SokolovaMaria1dcfd972019-08-09 17:35:14 +03001@file:JvmName("FlowKt")
2
3package kotlinx.coroutines.reactor
4
5import kotlinx.coroutines.*
6import kotlinx.coroutines.flow.Flow
7import kotlinx.coroutines.flow.flowOn
8import kotlinx.coroutines.reactive.FlowSubscription
9import reactor.core.CoreSubscriber
10import reactor.core.publisher.Flux
11
12/**
13 * Converts the given flow to a cold flux.
14 * The original flow is cancelled when the flux subscriber is disposed.
15 */
16@ExperimentalCoroutinesApi
17public fun <T: Any> Flow<T>.asFlux(): Flux<T> = FlowAsFlux(this)
18
19private class FlowAsFlux<T : Any>(private val flow: Flow<T>) : Flux<T>() {
20 override fun subscribe(subscriber: CoreSubscriber<in T>?) {
21 if (subscriber == null) throw NullPointerException()
22 val hasContext = subscriber.currentContext().isEmpty
23 val source = if (hasContext) flow.flowOn(subscriber.currentContext().asCoroutineContext()) else flow
24 subscriber.onSubscribe(FlowSubscription(source, subscriber))
25 }
26}