Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
| 3 | */ |
| 4 | |
| 5 | @file:JvmMultifileClass |
| 6 | @file:JvmName("FlowKt") |
| 7 | @file:Suppress("unused") |
| 8 | |
| 9 | package kotlinx.coroutines.flow |
Vsevolod Tolstopyatov | 8788488 | 2019-04-09 18:36:22 +0300 | [diff] [blame] | 10 | |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 11 | import kotlinx.coroutines.* |
| 12 | import kotlinx.coroutines.channels.* |
Roman Elizarov | b77a80c | 2019-05-29 17:42:58 +0300 | [diff] [blame] | 13 | import kotlinx.coroutines.channels.Channel.Factory.OPTIONAL_CHANNEL |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 14 | import kotlinx.coroutines.flow.internal.* |
Roman Elizarov | b77a80c | 2019-05-29 17:42:58 +0300 | [diff] [blame] | 15 | import kotlinx.coroutines.internal.* |
| 16 | import kotlin.coroutines.* |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 17 | import kotlin.jvm.* |
Vsevolod Tolstopyatov | 8788488 | 2019-04-09 18:36:22 +0300 | [diff] [blame] | 18 | import kotlinx.coroutines.flow.unsafeFlow as flow |
| 19 | |
| 20 | /** |
Roman Elizarov | b77a80c | 2019-05-29 17:42:58 +0300 | [diff] [blame] | 21 | * Name of the property that defines the value of [DEFAULT_CONCURRENCY]. |
Vsevolod Tolstopyatov | 8788488 | 2019-04-09 18:36:22 +0300 | [diff] [blame] | 22 | */ |
| 23 | @FlowPreview |
Roman Elizarov | b77a80c | 2019-05-29 17:42:58 +0300 | [diff] [blame] | 24 | public const val DEFAULT_CONCURRENCY_PROPERTY_NAME = "kotlinx.coroutines.flow.defaultConcurrency" |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 25 | |
| 26 | /** |
Roman Elizarov | b77a80c | 2019-05-29 17:42:58 +0300 | [diff] [blame] | 27 | * Default concurrency limit that is used by [flattenMerge] and [flatMapMerge] operators. |
| 28 | * It is 16 by default and can be changed on JVM using [DEFAULT_CONCURRENCY_PROPERTY_NAME] property. |
| 29 | */ |
| 30 | @FlowPreview |
| 31 | public val DEFAULT_CONCURRENCY = systemProp(DEFAULT_CONCURRENCY_PROPERTY_NAME, |
| 32 | 16, 1, Int.MAX_VALUE |
| 33 | ) |
| 34 | |
| 35 | /** |
| 36 | * Transforms elements emitted by the original flow by applying [transform], that returns another flow, |
| 37 | * and then concatenating and flattening these flows. |
| 38 | * |
| 39 | * This method is is a shortcut for `map(transform).flattenConcat()`. See [flattenConcat]. |
| 40 | * |
| 41 | * Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows. |
| 42 | * Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about. |
| 43 | */ |
| 44 | @FlowPreview |
| 45 | public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> = |
| 46 | map(transform).flattenConcat() |
| 47 | |
| 48 | /** |
| 49 | * Transforms elements emitted by the original flow by applying [transform], that returns another flow, |
| 50 | * and then merging and flattening these flows. |
| 51 | * |
| 52 | * This operator calls [transform] *sequentially* and then merges the resulting flows with a [concurrency] |
| 53 | * limit on the number of concurrently collected flows. |
| 54 | * It is a shortcut for `map(transform).flattenMerge(concurrency)`. |
| 55 | * See [flattenMerge] for details. |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 56 | * |
| 57 | * Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows. |
| 58 | * Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about. |
| 59 | * |
Roman Elizarov | b77a80c | 2019-05-29 17:42:58 +0300 | [diff] [blame] | 60 | * ### Operator fusion |
| 61 | * |
| 62 | * Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with |
| 63 | * its concurrent merging so that only one properly configured channel is used for execution of merging logic. |
| 64 | * |
| 65 | * @param concurrency controls the number of in-flight flows, at most [concurrency] flows are collected |
| 66 | * at the same time. By default it is equal to [DEFAULT_CONCURRENCY]. |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 67 | */ |
| 68 | @FlowPreview |
Roman Elizarov | b77a80c | 2019-05-29 17:42:58 +0300 | [diff] [blame] | 69 | public fun <T, R> Flow<T>.flatMapMerge( |
| 70 | concurrency: Int = DEFAULT_CONCURRENCY, |
| 71 | transform: suspend (value: T) -> Flow<R> |
| 72 | ): Flow<R> = |
| 73 | map(transform).flattenMerge(concurrency) |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 74 | |
| 75 | /** |
Vsevolod Tolstopyatov | 8788488 | 2019-04-09 18:36:22 +0300 | [diff] [blame] | 76 | * Flattens the given flow of flows into a single flow in a sequentially manner, without interleaving nested flows. |
Roman Elizarov | b77a80c | 2019-05-29 17:42:58 +0300 | [diff] [blame] | 77 | * This method is conceptually identical to `flattenMerge(concurrency = 1)` but has faster implementation. |
| 78 | * |
| 79 | * Inner flows are collected by this operator *sequentially*. |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 80 | */ |
| 81 | @FlowPreview |
Vsevolod Tolstopyatov | 8788488 | 2019-04-09 18:36:22 +0300 | [diff] [blame] | 82 | public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow { |
Vsevolod Tolstopyatov | d5478b6 | 2019-06-06 11:43:31 +0300 | [diff] [blame^] | 83 | collect { value -> emitAll(value) } |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 84 | } |
| 85 | |
Vsevolod Tolstopyatov | 8788488 | 2019-04-09 18:36:22 +0300 | [diff] [blame] | 86 | /** |
Roman Elizarov | b77a80c | 2019-05-29 17:42:58 +0300 | [diff] [blame] | 87 | * Flattens the given flow of flows into a single flow with a [concurrency] limit on the number of |
| 88 | * concurrently collected flows. |
Vsevolod Tolstopyatov | 8788488 | 2019-04-09 18:36:22 +0300 | [diff] [blame] | 89 | * |
Roman Elizarov | b77a80c | 2019-05-29 17:42:58 +0300 | [diff] [blame] | 90 | * If [concurrency] is more than 1, then inner flows are be collected by this operator *concurrently*. |
| 91 | * With `concurrency == 1` this operator is identical to [flattenConcat]. |
| 92 | * |
| 93 | * ### Operator fusion |
| 94 | * |
| 95 | * Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with |
| 96 | * its concurrent merging so that only one properly configured channel is used for execution of merging logic. |
| 97 | * |
| 98 | * @param concurrency controls the number of in-flight flows, at most [concurrency] flows are collected |
| 99 | * at the same time. By default it is equal to [DEFAULT_CONCURRENCY]. |
Vsevolod Tolstopyatov | 8788488 | 2019-04-09 18:36:22 +0300 | [diff] [blame] | 100 | */ |
| 101 | @FlowPreview |
Roman Elizarov | b77a80c | 2019-05-29 17:42:58 +0300 | [diff] [blame] | 102 | public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T> { |
| 103 | require(concurrency > 0) { "Expected positive concurrency level, but had $concurrency" } |
| 104 | return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency) |
| 105 | } |
Vsevolod Tolstopyatov | 8788488 | 2019-04-09 18:36:22 +0300 | [diff] [blame] | 106 | |
Vsevolod Tolstopyatov | fe820ba | 2019-04-24 17:14:03 +0300 | [diff] [blame] | 107 | /** |
| 108 | * Returns a flow that switches to a new flow produced by [transform] function every time the original flow emits a value. |
| 109 | * When switch on the a flow is performed, the previous one is cancelled. |
| 110 | * |
| 111 | * For example, the following flow: |
| 112 | * ``` |
| 113 | * flow { |
| 114 | * emit("a") |
| 115 | * delay(100) |
| 116 | * emit("b") |
| 117 | * }.switchMap { value -> |
| 118 | * flow { |
| 119 | * emit(value + value) |
| 120 | * delay(200) |
| 121 | * emit(value + "_last") |
| 122 | * } |
| 123 | * } |
| 124 | * ``` |
| 125 | * produces `aa bb b_last` |
| 126 | */ |
| 127 | @FlowPreview |
Vsevolod Tolstopyatov | e2a5671 | 2019-06-05 18:40:18 +0300 | [diff] [blame] | 128 | public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = scopedFlow { downstream -> |
| 129 | var previousFlow: Job? = null |
| 130 | collect { value -> |
| 131 | // Linearize calls to emit as alternative to the channel. Bonus points for never-overlapping channels. |
| 132 | previousFlow?.cancel(ChildCancelledException()) |
| 133 | previousFlow?.join() |
| 134 | // Undispatched to have better user experience in case of synchronous flows |
| 135 | previousFlow = launch(start = CoroutineStart.UNDISPATCHED) { |
Vsevolod Tolstopyatov | d5478b6 | 2019-06-06 11:43:31 +0300 | [diff] [blame^] | 136 | downstream.emitAll(transform(value)) |
Vsevolod Tolstopyatov | fe820ba | 2019-04-24 17:14:03 +0300 | [diff] [blame] | 137 | } |
| 138 | } |
| 139 | } |
Vsevolod Tolstopyatov | 8788488 | 2019-04-09 18:36:22 +0300 | [diff] [blame] | 140 | |
Roman Elizarov | b77a80c | 2019-05-29 17:42:58 +0300 | [diff] [blame] | 141 | private class ChannelFlowMerge<T>( |
| 142 | flow: Flow<Flow<T>>, |
| 143 | private val concurrency: Int, |
| 144 | context: CoroutineContext = EmptyCoroutineContext, |
| 145 | capacity: Int = OPTIONAL_CHANNEL |
| 146 | ) : ChannelFlowOperator<Flow<T>, T>(flow, context, capacity) { |
| 147 | override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> = |
| 148 | ChannelFlowMerge(flow, concurrency, context, capacity) |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 149 | |
Roman Elizarov | b77a80c | 2019-05-29 17:42:58 +0300 | [diff] [blame] | 150 | // The actual merge implementation with concurrency limit |
| 151 | private suspend fun mergeImpl(scope: CoroutineScope, collector: ConcurrentFlowCollector<T>) { |
| 152 | val semaphore = Channel<Unit>(concurrency) |
| 153 | @Suppress("UNCHECKED_CAST") |
| 154 | flow.collect { inner -> |
| 155 | // TODO real semaphore (#94) |
| 156 | semaphore.send(Unit) // Acquire concurrency permit |
| 157 | scope.launch { |
| 158 | try { |
| 159 | inner.collect(collector) |
| 160 | } finally { |
| 161 | semaphore.receive() // Release concurrency permit |
| 162 | } |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 163 | } |
Vsevolod Tolstopyatov | 35f9ad5 | 2019-04-07 15:43:18 +0300 | [diff] [blame] | 164 | } |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 165 | } |
Roman Elizarov | b77a80c | 2019-05-29 17:42:58 +0300 | [diff] [blame] | 166 | |
| 167 | // Fast path in ChannelFlowOperator calls this function (channel was not created yet) |
| 168 | override suspend fun flowCollect(collector: FlowCollector<T>) { |
| 169 | // this function should not have been invoked when channel was explicitly requested |
| 170 | check(capacity == OPTIONAL_CHANNEL) |
Vsevolod Tolstopyatov | e2a5671 | 2019-06-05 18:40:18 +0300 | [diff] [blame] | 171 | flowScope { |
Roman Elizarov | b77a80c | 2019-05-29 17:42:58 +0300 | [diff] [blame] | 172 | mergeImpl(this, collector.asConcurrentFlowCollector()) |
| 173 | } |
| 174 | } |
| 175 | |
| 176 | // Slow path when output channel is required (and was created) |
| 177 | override suspend fun collectTo(scope: ProducerScope<T>) = |
| 178 | mergeImpl(scope, SendingCollector(scope)) |
| 179 | |
| 180 | override fun additionalToStringProps(): String = |
| 181 | "concurrency=$concurrency, " |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 182 | } |
Vsevolod Tolstopyatov | 35f9ad5 | 2019-04-07 15:43:18 +0300 | [diff] [blame] | 183 | |