FlatMap-related API rework
* Deprecate flatMap to see the feedback about it
* Introduce flatMapMerge and flatMapConcat (concurrent and sequential versions)
* Rename concat to flattenMerge and flattenConcat to be more like Sequence
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
index ef57668..68baa5f 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
@@ -793,8 +793,6 @@
public static final fun broadcastIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
public static synthetic fun broadcastIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel;
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
- public static final fun concatenate (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
- public static final fun concatenate (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun delayEach (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
@@ -807,8 +805,12 @@
public static final fun filter (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun filterNot (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun filterNotNull (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
- public static final fun flatMap (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
- public static synthetic fun flatMap$default (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun flatMapConcat (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun flatMapMerge (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+ public static synthetic fun flatMapMerge$default (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun flattenConcat (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun flattenMerge (Lkotlinx/coroutines/flow/Flow;II)Lkotlinx/coroutines/flow/Flow;
+ public static synthetic fun flattenMerge$default (Lkotlinx/coroutines/flow/Flow;IIILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowOf ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/Flow;
@@ -820,10 +822,6 @@
public static final fun fold (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun map (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun mapNotNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
- public static final fun merge (Ljava/lang/Iterable;II)Lkotlinx/coroutines/flow/Flow;
- public static final fun merge (Lkotlinx/coroutines/flow/Flow;II)Lkotlinx/coroutines/flow/Flow;
- public static synthetic fun merge$default (Ljava/lang/Iterable;IIILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
- public static synthetic fun merge$default (Lkotlinx/coroutines/flow/Flow;IIILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun onEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun onErrorCollect (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun onErrorCollect$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
@@ -851,8 +849,10 @@
public static final fun BehaviourSubject ()Ljava/lang/Object;
public static final fun PublishSubject ()Ljava/lang/Object;
public static final fun ReplaySubject ()Ljava/lang/Object;
- public static final fun concat (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun concatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun flatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun flatten (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun merge (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
public static final fun onErrorResume (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun publishOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
diff --git a/kotlinx-coroutines-core/common/src/flow/Migration.kt b/kotlinx-coroutines-core/common/src/flow/Migration.kt
index 5ab2dcf..b5da8ee 100644
--- a/kotlinx-coroutines-core/common/src/flow/Migration.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Migration.kt
@@ -64,18 +64,44 @@
@Deprecated(message = "Use launch + collect instead", level = DeprecationLevel.ERROR)
public fun <T> Flow<T>.subscribe(onEach: (T) -> Unit, onError: (Throwable) -> Unit): Unit = error("Should not be called")
-/** @suppress **/
+
+/**
+ * Note that this replacement is sequential (`concat`) by default.
+ * For concurrent flatMap [flatMapMerge] can be used instead.
+ * @suppress
+ */
@Deprecated(
level = DeprecationLevel.ERROR,
- message = "Flow analogue is named concatenate",
- replaceWith = ReplaceWith("concatenate()")
+ message = "Flow analogue is named flatMapConcat",
+ replaceWith = ReplaceWith("flatMapConcat(mapper)")
)
-public fun <T> Flow<T>.concat(): Flow<T> = error("Should not be called")
+public fun <T, R> Flow<T>.flatMap(mapper: suspend (T) -> Flow<R>): Flow<R> = error("Should not be called")
/** @suppress **/
@Deprecated(
level = DeprecationLevel.ERROR,
- message = "Flow analogue is named concatenate",
- replaceWith = ReplaceWith("concatenate(mapper)")
+ message = "Flow analogue is named flatMapConcat",
+ replaceWith = ReplaceWith("flatMapConcat(mapper)")
)
public fun <T, R> Flow<T>.concatMap(mapper: (T) -> Flow<R>): Flow<R> = error("Should not be called")
+
+
+/**
+ * Note that this replacement is sequential (`concat`) by default.
+ * For concurrent flatMap [flattenMerge] can be used instead.
+ * @suppress
+ */
+@Deprecated(
+ level = DeprecationLevel.ERROR,
+ message = "Flow analogue is named flattenConcat",
+ replaceWith = ReplaceWith("flattenConcat()")
+)
+public fun <T> Flow<Flow<T>>.merge(): Flow<T> = error("Should not be called")
+
+/** @suppress **/
+@Deprecated(
+ level = DeprecationLevel.ERROR,
+ message = "Flow analogue is named flattenConcat",
+ replaceWith = ReplaceWith("flattenConcat()")
+)
+public fun <T> Flow<Flow<T>>.flatten(): Flow<T> = error("Should not be called")
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
index 84257a1..cc19d3a 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
@@ -7,12 +7,29 @@
@file:Suppress("unused")
package kotlinx.coroutines.flow
+
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.internal.*
-import kotlinx.coroutines.flow.unsafeFlow as flow
import kotlin.jvm.*
+import kotlinx.coroutines.flow.unsafeFlow as flow
+
+/**
+ * Transforms elements emitted by the original flow by applying [mapper], that returns another flow, and then concatenating and flattening these flows.
+ * This method is identical to `flatMapMerge(concurrency = 1, bufferSize = 1)`
+ *
+ * Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows.
+ * Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about.
+ */
+@FlowPreview
+public fun <T, R> Flow<T>.flatMapConcat(mapper: suspend (value: T) -> Flow<R>): Flow<R> = flow {
+ collect { value ->
+ mapper(value).collect { innerValue ->
+ emit(innerValue)
+ }
+ }
+}
/**
* Transforms elements emitted by the original flow by applying [mapper], that returns another flow, and then merging and flattening these flows.
@@ -21,10 +38,10 @@
* Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about.
*
* [bufferSize] parameter controls the size of backpressure aka the amount of queued in-flight elements.
- * [concurrency] parameter controls the size of in-flight flows.
+ * [concurrency] parameter controls the size of in-flight flows, at most [concurrency] flows are collected at the same time.
*/
@FlowPreview
-public fun <T, R> Flow<T>.flatMap(concurrency: Int = 16, bufferSize: Int = 16, mapper: suspend (value: T) -> Flow<R>): Flow<R> {
+public fun <T, R> Flow<T>.flatMapMerge(concurrency: Int = 16, bufferSize: Int = 16, mapper: suspend (value: T) -> Flow<R>): Flow<R> {
return flow {
val semaphore = Channel<Unit>(concurrency)
val flatMap = SerializingFlatMapCollector(this, bufferSize)
@@ -47,56 +64,36 @@
}
/**
- * Merges the given sequence of flows into a single flow with no guarantees on the order.
- *
- * [bufferSize] parameter controls the size of backpressure aka the amount of queued in-flight elements.
- * [concurrency] parameter controls the size of in-flight flows.
+ * Flattens the given flow of flows into a single flow in a sequentially manner, without interleaving nested flows.
+ * This method is identical to `flattenMerge(concurrency = 1, bufferSize = 1)
*/
@FlowPreview
-public fun <T> Iterable<Flow<T>>.merge(concurrency: Int = 16, bufferSize: Int = 16): Flow<T> = asFlow().flatMap(concurrency, bufferSize) { it }
-
-/**
- * Merges the given flow of flows into a single flow with no guarantees on the order.
- *
- * [bufferSize] parameter controls the size of backpressure aka the amount of queued in-flight elements.
- * [concurrency] parameter controls the size of in-flight flows.
- */
-@FlowPreview
-public fun <T> Flow<Flow<T>>.merge(concurrency: Int = 16, bufferSize: Int = 16): Flow<T> = flatMap(concurrency, bufferSize) { it }
-
-/**
- * Concatenates values of each flow sequentially, without interleaving them.
- */
-@FlowPreview
-public fun <T> Flow<Flow<T>>.concatenate(): Flow<T> = flow {
- collect {
- val inner = it
- inner.collect { value ->
- emit(value)
- }
- }
-}
-
-/**
- * Transforms each value of the given flow into flow of another type and then flattens these flows
- * sequentially, without interleaving them.
- */
-@FlowPreview
-public fun <T, R> Flow<T>.concatenate(mapper: suspend (T) -> Flow<R>): Flow<R> = flow {
+public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
collect { value ->
- mapper(value).collect { innerValue ->
+ value.collect { innerValue ->
emit(innerValue)
}
}
}
+/**
+ * Flattens the given flow of flows into a single flow.
+ * This method is identical to `flatMapMerge(concurrency, bufferSize) { it }`
+ *
+ * [bufferSize] parameter controls the size of backpressure aka the amount of queued in-flight elements.
+ * [concurrency] parameter controls the size of in-flight flows, at most [concurrency] flows are collected at the same time.
+ */
+@FlowPreview
+public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = 16, bufferSize: Int = 16): Flow<T> = flatMapMerge(concurrency, bufferSize) { it }
+
+
// Effectively serializes access to downstream collector from flatMap
private class SerializingFlatMapCollector<T>(
private val downstream: FlowCollector<T>,
private val bufferSize: Int
) {
- // Let's try to leverage the fact that flatMap is never contended
+ // Let's try to leverage the fact that flatMapMerge is never contended
private val channel: Channel<Any?> by lazy { Channel<Any?>(bufferSize) } // Should be any, but KT-30796
private val inProgressLock = atomic(false)
private val sentValues = atomic(0)
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ConcatenateMapTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ConcatenateMapTest.kt
deleted file mode 100644
index d4e15a8..0000000
--- a/kotlinx-coroutines-core/common/test/flow/operators/ConcatenateMapTest.kt
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.flow
-
-import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.*
-import kotlin.test.*
-
-class ConcatenateMapTest : TestBase() {
- @Test
- fun testConcatenate() = runTest {
- val n = 100
- val sum = flow {
- repeat(n) {
- emit(it + 1) // 1..100
- }
- }.concatenate { value ->
- // 1 + (1 + 2) + (1 + 2 + 3) + ... (1 + .. + n)
- flow {
- repeat(value) {
- emit(it + 1)
- }
- }
- }.sum()
-
- assertEquals(n * (n + 1) * (n + 2) / 6, sum)
- }
-
- @Test
- fun testSingle() = runTest {
- val flow = flow {
- repeat(100) {
- emit(it)
- }
- }.concatenate { value ->
- if (value == 99) flowOf(42)
- else flowOf()
- }
-
- val value = flow.single()
- assertEquals(42, value)
- }
-
- @Test
- fun testFailure() = runTest {
- var finally = false
- val latch = Channel<Unit>()
- val flow = flow {
- coroutineScope {
- launch {
- latch.send(Unit)
- hang { finally = true }
- }
-
- emit(1)
- }
- }.concatenate {
- flow<Int> {
- latch.receive()
- throw TestException()
- }
- }
-
- assertFailsWith<TestException> { flow.count() }
- assertTrue(finally)
- }
-
- @Test
- fun testFailureInMapOperation() = runTest {
- val latch = Channel<Unit>()
- val flow = flow {
- coroutineScope {
- launch {
- latch.send(Unit)
- hang { expect(3) }
- }
-
- expect(1)
- emit(1)
- }
- }.concatenate<Int, Int> {
- latch.receive()
- expect(2)
- throw TestException()
- flowOf<Int>() // Workaround for KT-30642, return type should not be Nothing
- }
-
- assertFailsWith<TestException> { flow.count() }
- finish(4)
- }
-
- @Test
- fun testContext() = runTest {
- val captured = ArrayList<String>()
- val flow = flowOf(1)
- .flowOn(NamedDispatchers("irrelevant"))
- .concatenate {
- flow {
- captured += NamedDispatchers.name()
- emit(it)
- }
- }
-
- flow.flowOn(NamedDispatchers("1")).sum()
- flow.flowOn(NamedDispatchers("2")).sum()
- assertEquals(listOf("1", "2"), captured)
- }
-
-
- @Test
- fun testIsolatedContext() = runTest {
- val flow = flowOf(1)
- .flowOn(NamedDispatchers("irrelevant"))
- .flowWith(NamedDispatchers("inner")) {
- concatenate {
- flow {
- expect(2)
- assertEquals("inner", NamedDispatchers.name())
- emit(it)
- }
- }
- }.flowOn(NamedDispatchers("irrelevant"))
- .concatenate {
- flow {
- expect(3)
- assertEquals("outer", NamedDispatchers.name())
- emit(it)
- }
- }.flowOn(NamedDispatchers("outer"))
-
- expect(1)
- assertEquals(1, flow.single())
- finish(4)
- }
-}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ConcatenateTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ConcatenateTest.kt
deleted file mode 100644
index 6a50bc9..0000000
--- a/kotlinx-coroutines-core/common/test/flow/operators/ConcatenateTest.kt
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.flow
-
-import kotlinx.coroutines.*
-import kotlin.test.*
-
-class ConcatenateTest : TestBase() {
- @Test
- fun testConcatenate() = runTest {
- val n = 100
- val sum = (1..n).asFlow()
- .map { value ->
- flow {
- repeat(value) {
- emit(it + 1)
- }
- }
- }.concatenate().sum()
- assertEquals(n * (n + 1) * (n + 2) / 6, sum)
- }
-
- @Test
- fun testSingle() = runTest {
- val flows = flow {
- repeat(100) {
- if (it == 99) emit(flowOf(42))
- else emit(flowOf())
- }
- }
-
- val value = flows.concatenate().single()
- assertEquals(42, value)
- }
-
-
- @Test
- fun testContext() = runTest {
- val flow = flow {
- emit(flow {
- expect(2)
- assertEquals("first", NamedDispatchers.name())
- emit(1)
- }.flowOn(NamedDispatchers("first")))
-
- emit(flow {
- expect(3)
- assertEquals("second", NamedDispatchers.name())
- emit(1)
- }.flowOn(NamedDispatchers("second")))
- }.concatenate().flowOn(NamedDispatchers("first"))
-
- expect(1)
- assertEquals(2, flow.sum())
- finish(4)
- }
-}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapBaseTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapBaseTest.kt
new file mode 100644
index 0000000..766b4a4
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapBaseTest.kt
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow.operators
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import kotlin.test.*
+
+abstract class FlatMapBaseTest : TestBase() {
+
+ abstract fun <T> Flow<T>.flatMap(mapper: suspend (T) -> Flow<T>): Flow<T>
+
+ @Test
+ fun testFlatMap() = runTest {
+ val n = 100
+ val sum = (1..100).asFlow()
+ .flatMap { value ->
+ // 1 + (1 + 2) + (1 + 2 + 3) + ... (1 + .. + n)
+ flow {
+ repeat(value) {
+ emit(it + 1)
+ }
+ }
+ }.sum()
+
+ assertEquals(n * (n + 1) * (n + 2) / 6, sum)
+ }
+
+ @Test
+ fun testSingle() = runTest {
+ val flow = flow {
+ repeat(100) {
+ emit(it)
+ }
+ }.flatMap { value ->
+ if (value == 99) flowOf(42)
+ else flowOf()
+ }
+
+ val value = flow.single()
+ assertEquals(42, value)
+ }
+
+ @Test
+ fun testContext() = runTest {
+ val captured = ArrayList<String>()
+ val flow = flowOf(1)
+ .flowOn(NamedDispatchers("irrelevant"))
+ .flatMap {
+ captured += NamedDispatchers.name()
+ flow {
+ captured += NamedDispatchers.name()
+ emit(it)
+ }
+ }
+
+ flow.flowOn(NamedDispatchers("1")).sum()
+ flow.flowOn(NamedDispatchers("2")).sum()
+ assertEquals(listOf("1", "1", "2", "2"), captured)
+ }
+
+ @Test
+ fun testIsolatedContext() = runTest {
+ val flow = flowOf(1)
+ .flowOn(NamedDispatchers("irrelevant"))
+ .flowWith(NamedDispatchers("inner")) {
+ flatMap {
+ flow {
+ assertEquals("inner", NamedDispatchers.name())
+ emit(it)
+ }
+ }
+ }.flowOn(NamedDispatchers("irrelevant"))
+ .flatMap {
+ flow {
+ assertEquals("outer", NamedDispatchers.name())
+ emit(it)
+ }
+ }.flowOn(NamedDispatchers("outer"))
+
+ assertEquals(1, flow.singleOrNull())
+ }
+}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapConcatTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapConcatTest.kt
new file mode 100644
index 0000000..4bf1b16
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapConcatTest.kt
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.operators.*
+import kotlin.test.*
+
+class FlatMapConcatTest : FlatMapBaseTest() {
+
+ override fun <T> Flow<T>.flatMap(mapper: suspend (T) -> Flow<T>): Flow<T> = flatMapConcat(mapper = mapper)
+
+ @Test
+ fun testFlatMapConcurrency() = runTest {
+ var concurrentRequests = 0
+ val flow = (1..100).asFlow().flatMapConcat { value ->
+ flow {
+ ++concurrentRequests
+ emit(value)
+ delay(Long.MAX_VALUE)
+ }
+ }
+
+ val consumer = launch {
+ flow.collect { value ->
+ expect(value)
+ }
+ }
+
+ repeat(4) {
+ yield()
+ }
+
+ assertEquals(1, concurrentRequests)
+ consumer.cancelAndJoin()
+ finish(2)
+ }
+}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeBaseTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeBaseTest.kt
new file mode 100644
index 0000000..fe37ae6
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeBaseTest.kt
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+
+package kotlinx.coroutines.flow.operators
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import kotlin.test.*
+
+abstract class FlatMapMergeBaseTest : FlatMapBaseTest() {
+ @Test
+ fun testFailureCancellation() = runTest {
+ val flow = flow {
+ expect(2)
+ emit(1)
+ expect(3)
+ emit(2)
+ expect(4)
+ }.flatMap {
+ if (it == 1) flow {
+ hang { expect(6) }
+ } else flow<Int> {
+ expect(5)
+ throw TestException()
+ }
+ }
+
+ expect(1)
+ assertFailsWith<TestException> { flow.singleOrNull() }
+ finish(7)
+ }
+
+ @Test
+ fun testConcurrentFailure() = runTest {
+ val latch = Channel<Unit>()
+ val flow = flow {
+ expect(2)
+ emit(1)
+ expect(3)
+ emit(2)
+ }.flatMap {
+ if (it == 1) flow<Int> {
+ expect(5)
+ latch.send(Unit)
+ hang {
+ expect(7)
+ throw TestException2()
+
+ }
+ } else {
+ expect(4)
+ latch.receive()
+ expect(6)
+ throw TestException()
+ }
+ }
+
+ expect(1)
+ assertFailsWith<TestException>(flow)
+ finish(8)
+ }
+
+ @Test
+ fun testFailureInMapOperationCancellation() = runTest {
+ val latch = Channel<Unit>()
+ val flow = flow {
+ expect(2)
+ emit(1)
+ expect(3)
+ emit(2)
+ expectUnreached()
+ }.flatMap {
+ if (it == 1) flow<Int> {
+ expect(5)
+ latch.send(Unit)
+ hang { expect(7) }
+ } else {
+ expect(4)
+ latch.receive()
+ expect(6)
+ throw TestException()
+ }
+ }
+
+ expect(1)
+ assertFailsWith<TestException> { flow.count() }
+ finish(8)
+ }
+
+ @Test
+ abstract fun testFlatMapConcurrency()
+}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt
new file mode 100644
index 0000000..cbd68eb
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.operators.*
+import kotlin.test.*
+
+class FlatMapMergeTest : FlatMapMergeBaseTest() {
+
+ override fun <T> Flow<T>.flatMap(mapper: suspend (T) -> Flow<T>): Flow<T> = flatMapMerge(mapper = mapper)
+
+ @Test
+ override fun testFlatMapConcurrency() = runTest {
+ var concurrentRequests = 0
+ val flow = (1..100).asFlow().flatMapMerge(concurrency = 2) { value ->
+ flow {
+ ++concurrentRequests
+ emit(value)
+ delay(Long.MAX_VALUE)
+ }
+ }
+
+ val consumer = launch {
+ flow.collect { value ->
+ expect(value)
+ }
+ }
+
+ repeat(4) {
+ yield()
+ }
+
+ assertEquals(2, concurrentRequests)
+ consumer.cancelAndJoin()
+ finish(3)
+ }
+}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapTest.kt
deleted file mode 100644
index ad5d46b..0000000
--- a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapTest.kt
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.flow
-
-import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.*
-import kotlin.test.*
-
-class FlatMapTest : TestBase() {
-
- @Test
- fun testFlatMap() = runTest {
- val n = 100
- val sum = (1..100).asFlow()
- .flatMap { value ->
- // 1 + (1 + 2) + (1 + 2 + 3) + ... (1 + .. + n)
- flow {
- repeat(value) {
- emit(it + 1)
- }
- }
- }.sum()
-
- assertEquals(n * (n + 1) * (n + 2) / 6, sum)
- }
-
- @Test
- fun testSingle() = runTest {
- val flow = flow {
- repeat(100) {
- emit(it)
- }
- }.flatMap { value ->
- if (value == 99) flowOf(42)
- else flowOf()
- }
-
- val value = flow.single()
- assertEquals(42, value)
- }
-
- @Test
- fun testFailureCancellation() = runTest {
- val flow = flow {
- expect(2)
- emit(1)
- expect(3)
- emit(2)
- expect(4)
- }.flatMap {
- if (it == 1) flow {
- hang { expect(6) }
- } else flow<Int> {
- expect(5)
- throw TestException()
- }
- }
-
- expect(1)
- assertFailsWith<TestException> { flow.singleOrNull() }
- finish(7)
- }
-
- @Test
- fun testFailureInMapOperationCancellation() = runTest {
- val latch = Channel<Unit>()
- val flow = flow {
- expect(2)
- emit(1)
- expect(3)
- emit(2)
- expectUnreached()
- }.flatMap {
- if (it == 1) flow<Int> {
- expect(5)
- latch.send(Unit)
- hang { expect(7) }
- } else {
- expect(4)
- latch.receive()
- expect(6)
- throw TestException()
- }
- }
-
- expect(1)
- assertFailsWith<TestException> { flow.count() }
- finish(8)
- }
-
- @Test
- fun testConcurrentFailure() = runTest {
- val latch = Channel<Unit>()
- val flow = flow {
- expect(2)
- emit(1)
- expect(3)
- emit(2)
- }.flatMap {
- if (it == 1) flow<Int> {
- expect(5)
- latch.send(Unit)
- hang {
- expect(7)
- throw TestException2()
-
- }
- } else {
- expect(4)
- latch.receive()
- expect(6)
- throw TestException()
- }
- }
-
- expect(1)
- assertFailsWith<TestException>(flow)
- finish(8)
- }
-
- @Test
- fun testContext() = runTest {
- val captured = ArrayList<String>()
- val flow = flowOf(1)
- .flowOn(NamedDispatchers("irrelevant"))
- .flatMap {
- captured += NamedDispatchers.name()
- flow {
- captured += NamedDispatchers.name()
- emit(it)
- }
- }
-
- flow.flowOn(NamedDispatchers("1")).sum()
- flow.flowOn(NamedDispatchers("2")).sum()
- assertEquals(listOf("1", "1", "2", "2"), captured)
- }
-
- @Test
- fun testIsolatedContext() = runTest {
- val flow = flowOf(1)
- .flowOn(NamedDispatchers("irrelevant"))
- .flowWith(NamedDispatchers("inner")) {
- flatMap {
- flow {
- assertEquals("inner", NamedDispatchers.name())
- emit(it)
- }
- }
- }.flowOn(NamedDispatchers("irrelevant"))
- .flatMap {
- flow {
- assertEquals("outer", NamedDispatchers.name())
- emit(it)
- }
- }.flowOn(NamedDispatchers("outer"))
-
- assertEquals(1, flow.singleOrNull())
- }
-
- @Test
- fun testFlatMapConcurrency() = runTest {
- var concurrentRequests = 0
- val flow = (1..100).asFlow().flatMap(concurrency = 2) { value ->
- flow {
- ++concurrentRequests
- emit(value)
- delay(Long.MAX_VALUE)
- }
- }
-
- val consumer = launch {
- flow.collect { value ->
- expect(value)
- }
- }
-
- repeat(4) {
- yield()
- }
-
- assertEquals(2, concurrentRequests)
- consumer.cancelAndJoin()
- finish(3)
- }
-}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlattenConcatTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlattenConcatTest.kt
new file mode 100644
index 0000000..604b5df
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlattenConcatTest.kt
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.operators.*
+import kotlin.test.*
+
+class FlattenConcatTest : FlatMapBaseTest() {
+
+ override fun <T> Flow<T>.flatMap(mapper: suspend (T) -> Flow<T>): Flow<T> = map(mapper).flattenConcat()
+
+ @Test
+ fun testFlatMapConcurrency() = runTest {
+ var concurrentRequests = 0
+ val flow = (1..100).asFlow().map { value ->
+ flow {
+ ++concurrentRequests
+ emit(value)
+ delay(Long.MAX_VALUE)
+ }
+ }.flattenConcat()
+
+ val consumer = launch {
+ flow.collect { value ->
+ expect(value)
+ }
+ }
+
+ repeat(4) {
+ yield()
+ }
+
+ assertEquals(1, concurrentRequests)
+ consumer.cancelAndJoin()
+ finish(2)
+ }
+}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlattenMergeTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlattenMergeTest.kt
new file mode 100644
index 0000000..7774d17
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlattenMergeTest.kt
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.operators.*
+import kotlin.test.*
+
+class FlattenMergeTest : FlatMapMergeBaseTest() {
+
+ override fun <T> Flow<T>.flatMap(mapper: suspend (T) -> Flow<T>): Flow<T> = map(mapper).flattenMerge()
+
+ @Test
+ override fun testFlatMapConcurrency() = runTest {
+ var concurrentRequests = 0
+ val flow = (1..100).asFlow().map() { value ->
+ flow {
+ ++concurrentRequests
+ emit(value)
+ delay(Long.MAX_VALUE)
+ }
+ }.flattenMerge(concurrency = 2)
+
+ val consumer = launch {
+ flow.collect { value ->
+ expect(value)
+ }
+ }
+
+ repeat(4) {
+ yield()
+ }
+
+ assertEquals(2, concurrentRequests)
+ consumer.cancelAndJoin()
+ finish(3)
+ }
+}
diff --git a/kotlinx-coroutines-core/jvm/test/flow/FlatMapStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/FlatMapStressTest.kt
index c6a2357..ad18a69 100644
--- a/kotlinx-coroutines-core/jvm/test/flow/FlatMapStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/flow/FlatMapStressTest.kt
@@ -36,7 +36,7 @@
withContext(Dispatchers.Default) {
val inFlightElements = AtomicLong(0L)
var result = 0
- (1..iterations step 4).asFlow().flatMap(bufferSize = bufferSize) { value ->
+ (1..iterations step 4).asFlow().flatMapMerge(bufferSize = bufferSize) { value ->
unsafeFlow {
repeat(4) {
emit(value + it)
@@ -59,7 +59,7 @@
@Test
fun testDelivery() = runTest {
withContext(Dispatchers.Default) {
- val result = (1..iterations step 4).asFlow().flatMap { value ->
+ val result = (1..iterations step 4).asFlow().flatMapMerge { value ->
unsafeFlow {
repeat(4) { emit(value + it) }
}
@@ -72,7 +72,7 @@
fun testIndependentShortBursts() = runTest {
withContext(Dispatchers.Default) {
repeat(iterations) {
- val result = (1..4).asFlow().flatMap { value ->
+ val result = (1..4).asFlow().flatMapMerge { value ->
unsafeFlow {
emit(value)
emit(value)
@@ -86,7 +86,7 @@
private suspend fun testConcurrencyLevel(maxConcurrency: Int) {
assumeTrue(maxConcurrency <= CORE_POOL_SIZE)
val concurrency = AtomicLong()
- val result = (1..iterations).asFlow().flatMap(concurrency = maxConcurrency) { value ->
+ val result = (1..iterations).asFlow().flatMapMerge(concurrency = maxConcurrency) { value ->
unsafeFlow {
val current = concurrency.incrementAndGet()
assertTrue(current in 1..maxConcurrency)