blob: cc19d3ae4d580e5ddd870e68a2945d2da60377e4 [file] [log] [blame]
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +03001/*
2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5@file:JvmMultifileClass
6@file:JvmName("FlowKt")
7@file:Suppress("unused")
8
9package kotlinx.coroutines.flow
Vsevolod Tolstopyatov87884882019-04-09 18:36:22 +030010
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030011import kotlinx.atomicfu.*
12import kotlinx.coroutines.*
13import kotlinx.coroutines.channels.*
14import kotlinx.coroutines.flow.internal.*
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030015import kotlin.jvm.*
Vsevolod Tolstopyatov87884882019-04-09 18:36:22 +030016import kotlinx.coroutines.flow.unsafeFlow as flow
17
18/**
19 * Transforms elements emitted by the original flow by applying [mapper], that returns another flow, and then concatenating and flattening these flows.
20 * This method is identical to `flatMapMerge(concurrency = 1, bufferSize = 1)`
21 *
22 * Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows.
23 * Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about.
24 */
25@FlowPreview
26public fun <T, R> Flow<T>.flatMapConcat(mapper: suspend (value: T) -> Flow<R>): Flow<R> = flow {
27 collect { value ->
28 mapper(value).collect { innerValue ->
29 emit(innerValue)
30 }
31 }
32}
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030033
34/**
35 * Transforms elements emitted by the original flow by applying [mapper], that returns another flow, and then merging and flattening these flows.
36 *
37 * Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows.
38 * Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about.
39 *
40 * [bufferSize] parameter controls the size of backpressure aka the amount of queued in-flight elements.
Vsevolod Tolstopyatov87884882019-04-09 18:36:22 +030041 * [concurrency] parameter controls the size of in-flight flows, at most [concurrency] flows are collected at the same time.
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030042 */
43@FlowPreview
Vsevolod Tolstopyatov87884882019-04-09 18:36:22 +030044public fun <T, R> Flow<T>.flatMapMerge(concurrency: Int = 16, bufferSize: Int = 16, mapper: suspend (value: T) -> Flow<R>): Flow<R> {
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030045 return flow {
46 val semaphore = Channel<Unit>(concurrency)
47 val flatMap = SerializingFlatMapCollector(this, bufferSize)
48 coroutineScope {
49 collect { outerValue ->
50 semaphore.send(Unit) // Acquire concurrency permit
51 val inner = mapper(outerValue)
52 launch {
Vsevolod Tolstopyatov35f9ad52019-04-07 15:43:18 +030053 try {
54 inner.collect { value ->
55 flatMap.emit(value)
56 }
57 } finally {
58 semaphore.receive() // Release concurrency permit
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030059 }
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030060 }
61 }
62 }
63 }
64}
65
66/**
Vsevolod Tolstopyatov87884882019-04-09 18:36:22 +030067 * Flattens the given flow of flows into a single flow in a sequentially manner, without interleaving nested flows.
68 * This method is identical to `flattenMerge(concurrency = 1, bufferSize = 1)
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030069 */
70@FlowPreview
Vsevolod Tolstopyatov87884882019-04-09 18:36:22 +030071public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030072 collect { value ->
Vsevolod Tolstopyatov87884882019-04-09 18:36:22 +030073 value.collect { innerValue ->
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030074 emit(innerValue)
75 }
76 }
77}
78
Vsevolod Tolstopyatov87884882019-04-09 18:36:22 +030079/**
80 * Flattens the given flow of flows into a single flow.
81 * This method is identical to `flatMapMerge(concurrency, bufferSize) { it }`
82 *
83 * [bufferSize] parameter controls the size of backpressure aka the amount of queued in-flight elements.
84 * [concurrency] parameter controls the size of in-flight flows, at most [concurrency] flows are collected at the same time.
85 */
86@FlowPreview
87public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = 16, bufferSize: Int = 16): Flow<T> = flatMapMerge(concurrency, bufferSize) { it }
88
89
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030090// Effectively serializes access to downstream collector from flatMap
91private class SerializingFlatMapCollector<T>(
92 private val downstream: FlowCollector<T>,
93 private val bufferSize: Int
94) {
95
Vsevolod Tolstopyatov87884882019-04-09 18:36:22 +030096 // Let's try to leverage the fact that flatMapMerge is never contended
Vsevolod Tolstopyatov35f9ad52019-04-07 15:43:18 +030097 private val channel: Channel<Any?> by lazy { Channel<Any?>(bufferSize) } // Should be any, but KT-30796
98 private val inProgressLock = atomic(false)
99 private val sentValues = atomic(0)
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +0300100
Vsevolod Tolstopyatov35f9ad52019-04-07 15:43:18 +0300101 public suspend fun emit(value: T) {
102 if (!inProgressLock.tryAcquire()) {
103 sentValues.incrementAndGet()
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +0300104 channel.send(value ?: NullSurrogate)
Vsevolod Tolstopyatov35f9ad52019-04-07 15:43:18 +0300105 if (inProgressLock.tryAcquire()) {
106 helpEmit()
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +0300107 }
108 return
109 }
110
111 downstream.emit(value)
Vsevolod Tolstopyatov35f9ad52019-04-07 15:43:18 +0300112 helpEmit()
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +0300113 }
114
115 @Suppress("UNCHECKED_CAST")
Vsevolod Tolstopyatov35f9ad52019-04-07 15:43:18 +0300116 private suspend fun helpEmit() {
117 while (true) {
118 var element = channel.poll()
119 while (element != null) { // TODO receive or closed
120 if (element === NullSurrogate) downstream.emit(null as T)
121 else downstream.emit(element as T)
122 sentValues.decrementAndGet()
123 element = channel.poll()
124 }
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +0300125
Vsevolod Tolstopyatov35f9ad52019-04-07 15:43:18 +0300126 inProgressLock.release()
127 // Enforce liveness of the algorithm
128 // TODO looks like isEmpty use-case
129 if (sentValues.value == 0 || !inProgressLock.tryAcquire()) break
130 }
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +0300131 }
132}
Vsevolod Tolstopyatov35f9ad52019-04-07 15:43:18 +0300133
134private fun AtomicBoolean.tryAcquire(): Boolean = compareAndSet(false, true)
135
136private fun AtomicBoolean.release() {
137 value = false
138}