Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
| 3 | */ |
| 4 | |
| 5 | @file:JvmMultifileClass |
| 6 | @file:JvmName("FlowKt") |
| 7 | @file:Suppress("unused") |
| 8 | |
| 9 | package kotlinx.coroutines.flow |
Vsevolod Tolstopyatov | 8788488 | 2019-04-09 18:36:22 +0300 | [diff] [blame^] | 10 | |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 11 | import kotlinx.atomicfu.* |
| 12 | import kotlinx.coroutines.* |
| 13 | import kotlinx.coroutines.channels.* |
| 14 | import kotlinx.coroutines.flow.internal.* |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 15 | import kotlin.jvm.* |
Vsevolod Tolstopyatov | 8788488 | 2019-04-09 18:36:22 +0300 | [diff] [blame^] | 16 | import kotlinx.coroutines.flow.unsafeFlow as flow |
| 17 | |
| 18 | /** |
| 19 | * Transforms elements emitted by the original flow by applying [mapper], that returns another flow, and then concatenating and flattening these flows. |
| 20 | * This method is identical to `flatMapMerge(concurrency = 1, bufferSize = 1)` |
| 21 | * |
| 22 | * Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows. |
| 23 | * Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about. |
| 24 | */ |
| 25 | @FlowPreview |
| 26 | public fun <T, R> Flow<T>.flatMapConcat(mapper: suspend (value: T) -> Flow<R>): Flow<R> = flow { |
| 27 | collect { value -> |
| 28 | mapper(value).collect { innerValue -> |
| 29 | emit(innerValue) |
| 30 | } |
| 31 | } |
| 32 | } |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 33 | |
| 34 | /** |
| 35 | * Transforms elements emitted by the original flow by applying [mapper], that returns another flow, and then merging and flattening these flows. |
| 36 | * |
| 37 | * Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows. |
| 38 | * Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about. |
| 39 | * |
| 40 | * [bufferSize] parameter controls the size of backpressure aka the amount of queued in-flight elements. |
Vsevolod Tolstopyatov | 8788488 | 2019-04-09 18:36:22 +0300 | [diff] [blame^] | 41 | * [concurrency] parameter controls the size of in-flight flows, at most [concurrency] flows are collected at the same time. |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 42 | */ |
| 43 | @FlowPreview |
Vsevolod Tolstopyatov | 8788488 | 2019-04-09 18:36:22 +0300 | [diff] [blame^] | 44 | public fun <T, R> Flow<T>.flatMapMerge(concurrency: Int = 16, bufferSize: Int = 16, mapper: suspend (value: T) -> Flow<R>): Flow<R> { |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 45 | return flow { |
| 46 | val semaphore = Channel<Unit>(concurrency) |
| 47 | val flatMap = SerializingFlatMapCollector(this, bufferSize) |
| 48 | coroutineScope { |
| 49 | collect { outerValue -> |
| 50 | semaphore.send(Unit) // Acquire concurrency permit |
| 51 | val inner = mapper(outerValue) |
| 52 | launch { |
Vsevolod Tolstopyatov | 35f9ad5 | 2019-04-07 15:43:18 +0300 | [diff] [blame] | 53 | try { |
| 54 | inner.collect { value -> |
| 55 | flatMap.emit(value) |
| 56 | } |
| 57 | } finally { |
| 58 | semaphore.receive() // Release concurrency permit |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 59 | } |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 60 | } |
| 61 | } |
| 62 | } |
| 63 | } |
| 64 | } |
| 65 | |
| 66 | /** |
Vsevolod Tolstopyatov | 8788488 | 2019-04-09 18:36:22 +0300 | [diff] [blame^] | 67 | * Flattens the given flow of flows into a single flow in a sequentially manner, without interleaving nested flows. |
| 68 | * This method is identical to `flattenMerge(concurrency = 1, bufferSize = 1) |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 69 | */ |
| 70 | @FlowPreview |
Vsevolod Tolstopyatov | 8788488 | 2019-04-09 18:36:22 +0300 | [diff] [blame^] | 71 | public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow { |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 72 | collect { value -> |
Vsevolod Tolstopyatov | 8788488 | 2019-04-09 18:36:22 +0300 | [diff] [blame^] | 73 | value.collect { innerValue -> |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 74 | emit(innerValue) |
| 75 | } |
| 76 | } |
| 77 | } |
| 78 | |
Vsevolod Tolstopyatov | 8788488 | 2019-04-09 18:36:22 +0300 | [diff] [blame^] | 79 | /** |
| 80 | * Flattens the given flow of flows into a single flow. |
| 81 | * This method is identical to `flatMapMerge(concurrency, bufferSize) { it }` |
| 82 | * |
| 83 | * [bufferSize] parameter controls the size of backpressure aka the amount of queued in-flight elements. |
| 84 | * [concurrency] parameter controls the size of in-flight flows, at most [concurrency] flows are collected at the same time. |
| 85 | */ |
| 86 | @FlowPreview |
| 87 | public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = 16, bufferSize: Int = 16): Flow<T> = flatMapMerge(concurrency, bufferSize) { it } |
| 88 | |
| 89 | |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 90 | // Effectively serializes access to downstream collector from flatMap |
| 91 | private class SerializingFlatMapCollector<T>( |
| 92 | private val downstream: FlowCollector<T>, |
| 93 | private val bufferSize: Int |
| 94 | ) { |
| 95 | |
Vsevolod Tolstopyatov | 8788488 | 2019-04-09 18:36:22 +0300 | [diff] [blame^] | 96 | // Let's try to leverage the fact that flatMapMerge is never contended |
Vsevolod Tolstopyatov | 35f9ad5 | 2019-04-07 15:43:18 +0300 | [diff] [blame] | 97 | private val channel: Channel<Any?> by lazy { Channel<Any?>(bufferSize) } // Should be any, but KT-30796 |
| 98 | private val inProgressLock = atomic(false) |
| 99 | private val sentValues = atomic(0) |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 100 | |
Vsevolod Tolstopyatov | 35f9ad5 | 2019-04-07 15:43:18 +0300 | [diff] [blame] | 101 | public suspend fun emit(value: T) { |
| 102 | if (!inProgressLock.tryAcquire()) { |
| 103 | sentValues.incrementAndGet() |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 104 | channel.send(value ?: NullSurrogate) |
Vsevolod Tolstopyatov | 35f9ad5 | 2019-04-07 15:43:18 +0300 | [diff] [blame] | 105 | if (inProgressLock.tryAcquire()) { |
| 106 | helpEmit() |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 107 | } |
| 108 | return |
| 109 | } |
| 110 | |
| 111 | downstream.emit(value) |
Vsevolod Tolstopyatov | 35f9ad5 | 2019-04-07 15:43:18 +0300 | [diff] [blame] | 112 | helpEmit() |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 113 | } |
| 114 | |
| 115 | @Suppress("UNCHECKED_CAST") |
Vsevolod Tolstopyatov | 35f9ad5 | 2019-04-07 15:43:18 +0300 | [diff] [blame] | 116 | private suspend fun helpEmit() { |
| 117 | while (true) { |
| 118 | var element = channel.poll() |
| 119 | while (element != null) { // TODO receive or closed |
| 120 | if (element === NullSurrogate) downstream.emit(null as T) |
| 121 | else downstream.emit(element as T) |
| 122 | sentValues.decrementAndGet() |
| 123 | element = channel.poll() |
| 124 | } |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 125 | |
Vsevolod Tolstopyatov | 35f9ad5 | 2019-04-07 15:43:18 +0300 | [diff] [blame] | 126 | inProgressLock.release() |
| 127 | // Enforce liveness of the algorithm |
| 128 | // TODO looks like isEmpty use-case |
| 129 | if (sentValues.value == 0 || !inProgressLock.tryAcquire()) break |
| 130 | } |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 131 | } |
| 132 | } |
Vsevolod Tolstopyatov | 35f9ad5 | 2019-04-07 15:43:18 +0300 | [diff] [blame] | 133 | |
| 134 | private fun AtomicBoolean.tryAcquire(): Boolean = compareAndSet(false, true) |
| 135 | |
| 136 | private fun AtomicBoolean.release() { |
| 137 | value = false |
| 138 | } |