blob: a1404853aa1ba2eac4736f7e4ec7e7d07bfa364b [file] [log] [blame]
/*
* Copyright 2016-2017 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kotlinx.coroutines.experimental.rx2
import io.reactivex.Flowable
import kotlinx.coroutines.experimental.CoroutineDispatcher
import kotlinx.coroutines.experimental.CoroutineScope
import kotlinx.coroutines.experimental.DefaultDispatcher
import kotlinx.coroutines.experimental.Job
import kotlinx.coroutines.experimental.channels.ProducerScope
import kotlinx.coroutines.experimental.reactive.publish
import kotlin.coroutines.experimental.*
/**
* Creates cold [flowable][Flowable] that will run a given [block] in a coroutine.
* Every time the returned flowable is subscribed, it starts a new coroutine.
* Coroutine emits items with `send`. Unsubscribing cancels running coroutine.
*
* Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
* `onNext` is not invoked concurrently.
*
* | **Coroutine action** | **Signal to subscriber**
* | -------------------------------------------- | ------------------------
* | `send` | `onNext`
* | Normal completion or `close` without cause | `onComplete`
* | Failure with exception or `close` with cause | `onError`
*
* The [context] for the new coroutine can be explicitly specified.
* See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
* The [coroutineContext] of the parent coroutine may be used,
* in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
*
* @param context context of the coroutine. The default value is [DefaultDispatcher].
* @param block the coroutine code.
*/
@JvmOverloads // for binary compatibility with older code compiled before context had a default
public fun <T> rxFlowable(
context: CoroutineContext = DefaultDispatcher,
block: suspend ProducerScope<T>.() -> Unit
): Flowable<T> = Flowable.fromPublisher(publish(context, block = block))