blob: 38b116a83fe56663c738351bc045c3c0c3427c64 [file] [log] [blame]
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +03001/*
2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5@file:JvmMultifileClass
6@file:JvmName("FlowKt")
7@file:Suppress("unused")
8
9package kotlinx.coroutines.flow
Vsevolod Tolstopyatov87884882019-04-09 18:36:22 +030010
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030011import kotlinx.coroutines.*
12import kotlinx.coroutines.channels.*
Roman Elizarovb77a80c2019-05-29 17:42:58 +030013import kotlinx.coroutines.channels.Channel.Factory.OPTIONAL_CHANNEL
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030014import kotlinx.coroutines.flow.internal.*
Roman Elizarovb77a80c2019-05-29 17:42:58 +030015import kotlinx.coroutines.internal.*
16import kotlin.coroutines.*
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030017import kotlin.jvm.*
Vsevolod Tolstopyatov87884882019-04-09 18:36:22 +030018import kotlinx.coroutines.flow.unsafeFlow as flow
19
20/**
Roman Elizarovb77a80c2019-05-29 17:42:58 +030021 * Name of the property that defines the value of [DEFAULT_CONCURRENCY].
Vsevolod Tolstopyatov87884882019-04-09 18:36:22 +030022 */
23@FlowPreview
Roman Elizarovb77a80c2019-05-29 17:42:58 +030024public const val DEFAULT_CONCURRENCY_PROPERTY_NAME = "kotlinx.coroutines.flow.defaultConcurrency"
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030025
26/**
Roman Elizarovb77a80c2019-05-29 17:42:58 +030027 * Default concurrency limit that is used by [flattenMerge] and [flatMapMerge] operators.
28 * It is 16 by default and can be changed on JVM using [DEFAULT_CONCURRENCY_PROPERTY_NAME] property.
29 */
30@FlowPreview
31public val DEFAULT_CONCURRENCY = systemProp(DEFAULT_CONCURRENCY_PROPERTY_NAME,
32 16, 1, Int.MAX_VALUE
33)
34
35/**
36 * Transforms elements emitted by the original flow by applying [transform], that returns another flow,
37 * and then concatenating and flattening these flows.
38 *
39 * This method is is a shortcut for `map(transform).flattenConcat()`. See [flattenConcat].
40 *
41 * Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows.
42 * Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about.
43 */
44@FlowPreview
45public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
46 map(transform).flattenConcat()
47
48/**
49 * Transforms elements emitted by the original flow by applying [transform], that returns another flow,
50 * and then merging and flattening these flows.
51 *
52 * This operator calls [transform] *sequentially* and then merges the resulting flows with a [concurrency]
53 * limit on the number of concurrently collected flows.
54 * It is a shortcut for `map(transform).flattenMerge(concurrency)`.
55 * See [flattenMerge] for details.
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030056 *
57 * Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows.
58 * Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about.
59 *
Roman Elizarovb77a80c2019-05-29 17:42:58 +030060 * ### Operator fusion
61 *
62 * Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
63 * its concurrent merging so that only one properly configured channel is used for execution of merging logic.
64 *
65 * @param concurrency controls the number of in-flight flows, at most [concurrency] flows are collected
66 * at the same time. By default it is equal to [DEFAULT_CONCURRENCY].
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030067 */
68@FlowPreview
Roman Elizarovb77a80c2019-05-29 17:42:58 +030069public fun <T, R> Flow<T>.flatMapMerge(
70 concurrency: Int = DEFAULT_CONCURRENCY,
71 transform: suspend (value: T) -> Flow<R>
72): Flow<R> =
73 map(transform).flattenMerge(concurrency)
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030074
75/**
Vsevolod Tolstopyatov87884882019-04-09 18:36:22 +030076 * Flattens the given flow of flows into a single flow in a sequentially manner, without interleaving nested flows.
Roman Elizarovb77a80c2019-05-29 17:42:58 +030077 * This method is conceptually identical to `flattenMerge(concurrency = 1)` but has faster implementation.
78 *
79 * Inner flows are collected by this operator *sequentially*.
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030080 */
81@FlowPreview
Vsevolod Tolstopyatov87884882019-04-09 18:36:22 +030082public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
Vsevolod Tolstopyatovd5478b62019-06-06 11:43:31 +030083 collect { value -> emitAll(value) }
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030084}
85
Vsevolod Tolstopyatov87884882019-04-09 18:36:22 +030086/**
Roman Elizarovb77a80c2019-05-29 17:42:58 +030087 * Flattens the given flow of flows into a single flow with a [concurrency] limit on the number of
88 * concurrently collected flows.
Vsevolod Tolstopyatov87884882019-04-09 18:36:22 +030089 *
Roman Elizarovb77a80c2019-05-29 17:42:58 +030090 * If [concurrency] is more than 1, then inner flows are be collected by this operator *concurrently*.
91 * With `concurrency == 1` this operator is identical to [flattenConcat].
92 *
93 * ### Operator fusion
94 *
95 * Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
96 * its concurrent merging so that only one properly configured channel is used for execution of merging logic.
97 *
98 * @param concurrency controls the number of in-flight flows, at most [concurrency] flows are collected
99 * at the same time. By default it is equal to [DEFAULT_CONCURRENCY].
Vsevolod Tolstopyatov87884882019-04-09 18:36:22 +0300100 */
101@FlowPreview
Roman Elizarovb77a80c2019-05-29 17:42:58 +0300102public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T> {
103 require(concurrency > 0) { "Expected positive concurrency level, but had $concurrency" }
104 return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency)
105}
Vsevolod Tolstopyatov87884882019-04-09 18:36:22 +0300106
Vsevolod Tolstopyatovfe820ba2019-04-24 17:14:03 +0300107/**
108 * Returns a flow that switches to a new flow produced by [transform] function every time the original flow emits a value.
109 * When switch on the a flow is performed, the previous one is cancelled.
110 *
111 * For example, the following flow:
112 * ```
113 * flow {
114 * emit("a")
115 * delay(100)
116 * emit("b")
117 * }.switchMap { value ->
118 * flow {
119 * emit(value + value)
120 * delay(200)
121 * emit(value + "_last")
122 * }
123 * }
124 * ```
125 * produces `aa bb b_last`
126 */
127@FlowPreview
Vsevolod Tolstopyatove2a56712019-06-05 18:40:18 +0300128public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = scopedFlow { downstream ->
129 var previousFlow: Job? = null
130 collect { value ->
131 // Linearize calls to emit as alternative to the channel. Bonus points for never-overlapping channels.
132 previousFlow?.cancel(ChildCancelledException())
133 previousFlow?.join()
134 // Undispatched to have better user experience in case of synchronous flows
135 previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
Vsevolod Tolstopyatovd5478b62019-06-06 11:43:31 +0300136 downstream.emitAll(transform(value))
Vsevolod Tolstopyatovfe820ba2019-04-24 17:14:03 +0300137 }
138 }
139}
Vsevolod Tolstopyatov87884882019-04-09 18:36:22 +0300140
Roman Elizarovb77a80c2019-05-29 17:42:58 +0300141private class ChannelFlowMerge<T>(
142 flow: Flow<Flow<T>>,
143 private val concurrency: Int,
144 context: CoroutineContext = EmptyCoroutineContext,
145 capacity: Int = OPTIONAL_CHANNEL
146) : ChannelFlowOperator<Flow<T>, T>(flow, context, capacity) {
147 override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
148 ChannelFlowMerge(flow, concurrency, context, capacity)
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +0300149
Roman Elizarovb77a80c2019-05-29 17:42:58 +0300150 // The actual merge implementation with concurrency limit
151 private suspend fun mergeImpl(scope: CoroutineScope, collector: ConcurrentFlowCollector<T>) {
152 val semaphore = Channel<Unit>(concurrency)
153 @Suppress("UNCHECKED_CAST")
154 flow.collect { inner ->
155 // TODO real semaphore (#94)
156 semaphore.send(Unit) // Acquire concurrency permit
157 scope.launch {
158 try {
159 inner.collect(collector)
160 } finally {
161 semaphore.receive() // Release concurrency permit
162 }
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +0300163 }
Vsevolod Tolstopyatov35f9ad52019-04-07 15:43:18 +0300164 }
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +0300165 }
Roman Elizarovb77a80c2019-05-29 17:42:58 +0300166
167 // Fast path in ChannelFlowOperator calls this function (channel was not created yet)
168 override suspend fun flowCollect(collector: FlowCollector<T>) {
169 // this function should not have been invoked when channel was explicitly requested
170 check(capacity == OPTIONAL_CHANNEL)
Vsevolod Tolstopyatove2a56712019-06-05 18:40:18 +0300171 flowScope {
Roman Elizarovb77a80c2019-05-29 17:42:58 +0300172 mergeImpl(this, collector.asConcurrentFlowCollector())
173 }
174 }
175
176 // Slow path when output channel is required (and was created)
177 override suspend fun collectTo(scope: ProducerScope<T>) =
178 mergeImpl(scope, SendingCollector(scope))
179
180 override fun additionalToStringProps(): String =
181 "concurrency=$concurrency, "
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +0300182}
Vsevolod Tolstopyatov35f9ad52019-04-07 15:43:18 +0300183