blob: 63649f2a4e8eaa71f344af90773e8f424d5fbdf3 [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov331750b2017-02-15 17:59:17 +03003 */
4
5package kotlinx.coroutines.experimental.reactive
6
Francesco Vascoe19ee042017-11-29 22:12:35 +01007import kotlinx.coroutines.experimental.DefaultDispatcher
Roman Elizarov331750b2017-02-15 17:59:17 +03008import kotlinx.coroutines.experimental.channels.ReceiveChannel
9import org.reactivestreams.Publisher
10import kotlin.coroutines.experimental.CoroutineContext
11
12/**
13 * Converts a stream of elements received from the channel to the hot reactive publisher.
14 *
15 * Every subscriber receives values from this channel in **fan-out** fashion. If the are multiple subscribers,
16 * they'll receive values in round-robin way.
17 *
18 * @param context -- the coroutine context from which the resulting observable is going to be signalled
19 */
Francesco Vascoe19ee042017-11-29 22:12:35 +010020public fun <T> ReceiveChannel<T>.asPublisher(context: CoroutineContext = DefaultDispatcher): Publisher<T> = publish(context) {
Roman Elizarov3c3aed72017-03-09 12:31:59 +030021 for (t in this@asPublisher)
Roman Elizarov331750b2017-02-15 17:59:17 +030022 send(t)
23}
Roman Elizarov3c3aed72017-03-09 12:31:59 +030024
25/**
26 * @suppress **Deprecated**: Renamed to [asPublisher]
27 */
28@Deprecated(message = "Renamed to `asPublisher`",
29 replaceWith = ReplaceWith("asPublisher(context)"))
30public fun <T> ReceiveChannel<T>.toPublisher(context: CoroutineContext): Publisher<T> = asPublisher(context)