blob: c137afcb45e5d3a3531beeefc0bf8656354cfded [file] [log] [blame]
Roman Elizarov50e32212017-03-10 17:40:50 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov50e32212017-03-10 17:40:50 +03003 */
4
5package kotlinx.coroutines.experimental.rx2
6
7import io.reactivex.Flowable
Phil Glassff850b02017-10-16 23:31:32 +01008import kotlinx.coroutines.experimental.CoroutineDispatcher
9import kotlinx.coroutines.experimental.CoroutineScope
10import kotlinx.coroutines.experimental.DefaultDispatcher
11import kotlinx.coroutines.experimental.Job
Roman Elizarov50e32212017-03-10 17:40:50 +030012import kotlinx.coroutines.experimental.channels.ProducerScope
13import kotlinx.coroutines.experimental.reactive.publish
Roman Elizarovc7d10a42018-03-13 18:28:42 +030014import kotlin.coroutines.experimental.*
Roman Elizarov50e32212017-03-10 17:40:50 +030015
16/**
17 * Creates cold [flowable][Flowable] that will run a given [block] in a coroutine.
Phil Glassff850b02017-10-16 23:31:32 +010018 * Every time the returned flowable is subscribed, it starts a new coroutine.
Roman Elizarov50e32212017-03-10 17:40:50 +030019 * Coroutine emits items with `send`. Unsubscribing cancels running coroutine.
20 *
21 * Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
22 * `onNext` is not invoked concurrently.
23 *
24 * | **Coroutine action** | **Signal to subscriber**
25 * | -------------------------------------------- | ------------------------
26 * | `send` | `onNext`
27 * | Normal completion or `close` without cause | `onComplete`
28 * | Failure with exception or `close` with cause | `onError`
Phil Glassff850b02017-10-16 23:31:32 +010029 *
30 * The [context] for the new coroutine can be explicitly specified.
31 * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
Roman Elizarovc7d10a42018-03-13 18:28:42 +030032 * The [coroutineContext] of the parent coroutine may be used,
Phil Glassff850b02017-10-16 23:31:32 +010033 * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
34 * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
35 *
36 * @param context context of the coroutine. The default value is [DefaultDispatcher].
37 * @param block the coroutine code.
Roman Elizarov50e32212017-03-10 17:40:50 +030038 */
Phil Glassff850b02017-10-16 23:31:32 +010039@JvmOverloads // for binary compatibility with older code compiled before context had a default
Roman Elizarov50e32212017-03-10 17:40:50 +030040public fun <T> rxFlowable(
Phil Glassff850b02017-10-16 23:31:32 +010041 context: CoroutineContext = DefaultDispatcher,
Roman Elizarov50e32212017-03-10 17:40:50 +030042 block: suspend ProducerScope<T>.() -> Unit
Roman Elizarove8f694e2017-11-28 10:12:00 +030043): Flowable<T> = Flowable.fromPublisher(publish(context, block = block))