blob: cc19d3ae4d580e5ddd870e68a2945d2da60377e4 [file] [log] [blame]
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:JvmMultifileClass
@file:JvmName("FlowKt")
@file:Suppress("unused")
package kotlinx.coroutines.flow
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.internal.*
import kotlin.jvm.*
import kotlinx.coroutines.flow.unsafeFlow as flow
/**
* Transforms elements emitted by the original flow by applying [mapper], that returns another flow, and then concatenating and flattening these flows.
* This method is identical to `flatMapMerge(concurrency = 1, bufferSize = 1)`
*
* Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows.
* Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about.
*/
@FlowPreview
public fun <T, R> Flow<T>.flatMapConcat(mapper: suspend (value: T) -> Flow<R>): Flow<R> = flow {
collect { value ->
mapper(value).collect { innerValue ->
emit(innerValue)
}
}
}
/**
* Transforms elements emitted by the original flow by applying [mapper], that returns another flow, and then merging and flattening these flows.
*
* Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows.
* Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about.
*
* [bufferSize] parameter controls the size of backpressure aka the amount of queued in-flight elements.
* [concurrency] parameter controls the size of in-flight flows, at most [concurrency] flows are collected at the same time.
*/
@FlowPreview
public fun <T, R> Flow<T>.flatMapMerge(concurrency: Int = 16, bufferSize: Int = 16, mapper: suspend (value: T) -> Flow<R>): Flow<R> {
return flow {
val semaphore = Channel<Unit>(concurrency)
val flatMap = SerializingFlatMapCollector(this, bufferSize)
coroutineScope {
collect { outerValue ->
semaphore.send(Unit) // Acquire concurrency permit
val inner = mapper(outerValue)
launch {
try {
inner.collect { value ->
flatMap.emit(value)
}
} finally {
semaphore.receive() // Release concurrency permit
}
}
}
}
}
}
/**
* Flattens the given flow of flows into a single flow in a sequentially manner, without interleaving nested flows.
* This method is identical to `flattenMerge(concurrency = 1, bufferSize = 1)
*/
@FlowPreview
public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
collect { value ->
value.collect { innerValue ->
emit(innerValue)
}
}
}
/**
* Flattens the given flow of flows into a single flow.
* This method is identical to `flatMapMerge(concurrency, bufferSize) { it }`
*
* [bufferSize] parameter controls the size of backpressure aka the amount of queued in-flight elements.
* [concurrency] parameter controls the size of in-flight flows, at most [concurrency] flows are collected at the same time.
*/
@FlowPreview
public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = 16, bufferSize: Int = 16): Flow<T> = flatMapMerge(concurrency, bufferSize) { it }
// Effectively serializes access to downstream collector from flatMap
private class SerializingFlatMapCollector<T>(
private val downstream: FlowCollector<T>,
private val bufferSize: Int
) {
// Let's try to leverage the fact that flatMapMerge is never contended
private val channel: Channel<Any?> by lazy { Channel<Any?>(bufferSize) } // Should be any, but KT-30796
private val inProgressLock = atomic(false)
private val sentValues = atomic(0)
public suspend fun emit(value: T) {
if (!inProgressLock.tryAcquire()) {
sentValues.incrementAndGet()
channel.send(value ?: NullSurrogate)
if (inProgressLock.tryAcquire()) {
helpEmit()
}
return
}
downstream.emit(value)
helpEmit()
}
@Suppress("UNCHECKED_CAST")
private suspend fun helpEmit() {
while (true) {
var element = channel.poll()
while (element != null) { // TODO receive or closed
if (element === NullSurrogate) downstream.emit(null as T)
else downstream.emit(element as T)
sentValues.decrementAndGet()
element = channel.poll()
}
inProgressLock.release()
// Enforce liveness of the algorithm
// TODO looks like isEmpty use-case
if (sentValues.value == 0 || !inProgressLock.tryAcquire()) break
}
}
}
private fun AtomicBoolean.tryAcquire(): Boolean = compareAndSet(false, true)
private fun AtomicBoolean.release() {
value = false
}