FlatMap-related API rework

   * Deprecate flatMap to see the feedback about it
   * Introduce flatMapMerge and flatMapConcat (concurrent and sequential versions)
   * Rename concat to flattenMerge and flattenConcat to be more like Sequence
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
index ef57668..68baa5f 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
@@ -793,8 +793,6 @@
 	public static final fun broadcastIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
 	public static synthetic fun broadcastIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel;
 	public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
-	public static final fun concatenate (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
-	public static final fun concatenate (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 	public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 	public static final fun delayEach (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
@@ -807,8 +805,12 @@
 	public static final fun filter (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun filterNot (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun filterNotNull (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
-	public static final fun flatMap (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
-	public static synthetic fun flatMap$default (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun flatMapConcat (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun flatMapMerge (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+	public static synthetic fun flatMapMerge$default (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun flattenConcat (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun flattenMerge (Lkotlinx/coroutines/flow/Flow;II)Lkotlinx/coroutines/flow/Flow;
+	public static synthetic fun flattenMerge$default (Lkotlinx/coroutines/flow/Flow;IIILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun flow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun flowOf ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun flowOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/Flow;
@@ -820,10 +822,6 @@
 	public static final fun fold (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 	public static final fun map (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun mapNotNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
-	public static final fun merge (Ljava/lang/Iterable;II)Lkotlinx/coroutines/flow/Flow;
-	public static final fun merge (Lkotlinx/coroutines/flow/Flow;II)Lkotlinx/coroutines/flow/Flow;
-	public static synthetic fun merge$default (Ljava/lang/Iterable;IIILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
-	public static synthetic fun merge$default (Lkotlinx/coroutines/flow/Flow;IIILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun onEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun onErrorCollect (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
 	public static synthetic fun onErrorCollect$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
@@ -851,8 +849,10 @@
 	public static final fun BehaviourSubject ()Ljava/lang/Object;
 	public static final fun PublishSubject ()Ljava/lang/Object;
 	public static final fun ReplaySubject ()Ljava/lang/Object;
-	public static final fun concat (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun concatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun flatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun flatten (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun merge (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun onErrorResume (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun publishOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
diff --git a/kotlinx-coroutines-core/common/src/flow/Migration.kt b/kotlinx-coroutines-core/common/src/flow/Migration.kt
index 5ab2dcf..b5da8ee 100644
--- a/kotlinx-coroutines-core/common/src/flow/Migration.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Migration.kt
@@ -64,18 +64,44 @@
 @Deprecated(message = "Use launch + collect instead", level = DeprecationLevel.ERROR)
 public fun <T> Flow<T>.subscribe(onEach: (T) -> Unit, onError: (Throwable) -> Unit): Unit = error("Should not be called")
 
-/** @suppress **/
+
+/**
+ * Note that this replacement is sequential (`concat`) by default.
+ * For concurrent flatMap [flatMapMerge] can be used instead.
+ * @suppress
+ */
 @Deprecated(
     level = DeprecationLevel.ERROR,
-    message = "Flow analogue is named concatenate",
-    replaceWith = ReplaceWith("concatenate()")
+    message = "Flow analogue is named flatMapConcat",
+    replaceWith = ReplaceWith("flatMapConcat(mapper)")
 )
-public fun <T> Flow<T>.concat(): Flow<T> = error("Should not be called")
+public fun <T, R> Flow<T>.flatMap(mapper: suspend (T) -> Flow<R>): Flow<R> = error("Should not be called")
 
 /** @suppress **/
 @Deprecated(
     level = DeprecationLevel.ERROR,
-    message = "Flow analogue is named concatenate",
-    replaceWith = ReplaceWith("concatenate(mapper)")
+    message = "Flow analogue is named flatMapConcat",
+    replaceWith = ReplaceWith("flatMapConcat(mapper)")
 )
 public fun <T, R> Flow<T>.concatMap(mapper: (T) -> Flow<R>): Flow<R> = error("Should not be called")
+
+
+/**
+ * Note that this replacement is sequential (`concat`) by default.
+ * For concurrent flatMap [flattenMerge] can be used instead.
+ * @suppress
+ */
+@Deprecated(
+    level = DeprecationLevel.ERROR,
+    message = "Flow analogue is named flattenConcat",
+    replaceWith = ReplaceWith("flattenConcat()")
+)
+public fun <T> Flow<Flow<T>>.merge(): Flow<T> = error("Should not be called")
+
+/** @suppress **/
+@Deprecated(
+    level = DeprecationLevel.ERROR,
+    message = "Flow analogue is named flattenConcat",
+    replaceWith = ReplaceWith("flattenConcat()")
+)
+public fun <T> Flow<Flow<T>>.flatten(): Flow<T> = error("Should not be called")
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
index 84257a1..cc19d3a 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
@@ -7,12 +7,29 @@
 @file:Suppress("unused")
 
 package kotlinx.coroutines.flow
+
 import kotlinx.atomicfu.*
 import kotlinx.coroutines.*
 import kotlinx.coroutines.channels.*
 import kotlinx.coroutines.flow.internal.*
-import kotlinx.coroutines.flow.unsafeFlow as flow
 import kotlin.jvm.*
+import kotlinx.coroutines.flow.unsafeFlow as flow
+
+/**
+ * Transforms elements emitted by the original flow by applying [mapper], that returns another flow, and then concatenating and flattening these flows.
+ * This method is identical to `flatMapMerge(concurrency = 1, bufferSize = 1)`
+ *
+ * Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows.
+ * Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about.
+ */
+@FlowPreview
+public fun <T, R> Flow<T>.flatMapConcat(mapper: suspend (value: T) -> Flow<R>): Flow<R> = flow {
+    collect { value ->
+        mapper(value).collect { innerValue ->
+            emit(innerValue)
+        }
+    }
+}
 
 /**
  * Transforms elements emitted by the original flow by applying [mapper], that returns another flow, and then merging and flattening these flows.
@@ -21,10 +38,10 @@
  * Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about.
  *
  * [bufferSize] parameter controls the size of backpressure aka the amount of queued in-flight elements.
- * [concurrency] parameter controls the size of in-flight flows.
+ * [concurrency] parameter controls the size of in-flight flows, at most [concurrency] flows are collected at the same time.
  */
 @FlowPreview
-public fun <T, R> Flow<T>.flatMap(concurrency: Int = 16, bufferSize: Int = 16, mapper: suspend (value: T) -> Flow<R>): Flow<R> {
+public fun <T, R> Flow<T>.flatMapMerge(concurrency: Int = 16, bufferSize: Int = 16, mapper: suspend (value: T) -> Flow<R>): Flow<R> {
     return flow {
         val semaphore = Channel<Unit>(concurrency)
         val flatMap = SerializingFlatMapCollector(this, bufferSize)
@@ -47,56 +64,36 @@
 }
 
 /**
- * Merges the given sequence of flows into a single flow with no guarantees on the order.
- *
- * [bufferSize] parameter controls the size of backpressure aka the amount of queued in-flight elements.
- * [concurrency] parameter controls the size of in-flight flows.
+ * Flattens the given flow of flows into a single flow in a sequentially manner, without interleaving nested flows.
+ * This method is identical to `flattenMerge(concurrency = 1, bufferSize = 1)
  */
 @FlowPreview
-public fun <T> Iterable<Flow<T>>.merge(concurrency: Int = 16, bufferSize: Int = 16): Flow<T> = asFlow().flatMap(concurrency, bufferSize) { it }
-
-/**
- * Merges the given flow of flows into a single flow with no guarantees on the order.
- *
- * [bufferSize] parameter controls the size of backpressure aka the amount of queued in-flight elements.
- * [concurrency] parameter controls the size of in-flight flows.
- */
-@FlowPreview
-public fun <T> Flow<Flow<T>>.merge(concurrency: Int = 16, bufferSize: Int = 16): Flow<T> = flatMap(concurrency, bufferSize) { it }
-
-/**
- * Concatenates values of each flow sequentially, without interleaving them.
- */
-@FlowPreview
-public fun <T> Flow<Flow<T>>.concatenate(): Flow<T> = flow {
-    collect {
-        val inner = it
-        inner.collect { value ->
-            emit(value)
-        }
-    }
-}
-
-/**
- * Transforms each value of the given flow into flow of another type and then flattens these flows
- * sequentially, without interleaving them.
- */
-@FlowPreview
-public fun <T, R> Flow<T>.concatenate(mapper: suspend (T) -> Flow<R>): Flow<R> = flow {
+public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
     collect { value ->
-        mapper(value).collect { innerValue ->
+        value.collect { innerValue ->
             emit(innerValue)
         }
     }
 }
 
+/**
+ * Flattens the given flow of flows into a single flow.
+ * This method is identical to `flatMapMerge(concurrency, bufferSize) { it }`
+ *
+ * [bufferSize] parameter controls the size of backpressure aka the amount of queued in-flight elements.
+ * [concurrency] parameter controls the size of in-flight flows, at most [concurrency] flows are collected at the same time.
+ */
+@FlowPreview
+public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = 16, bufferSize: Int = 16): Flow<T> = flatMapMerge(concurrency, bufferSize) { it }
+
+
 // Effectively serializes access to downstream collector from flatMap
 private class SerializingFlatMapCollector<T>(
     private val downstream: FlowCollector<T>,
     private val bufferSize: Int
 ) {
 
-    // Let's try to leverage the fact that flatMap is never contended
+    // Let's try to leverage the fact that flatMapMerge is never contended
     private val channel: Channel<Any?> by lazy { Channel<Any?>(bufferSize) } // Should be any, but KT-30796
     private val inProgressLock = atomic(false)
     private val sentValues = atomic(0)
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ConcatenateMapTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ConcatenateMapTest.kt
deleted file mode 100644
index d4e15a8..0000000
--- a/kotlinx-coroutines-core/common/test/flow/operators/ConcatenateMapTest.kt
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.flow
-
-import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.*
-import kotlin.test.*
-
-class ConcatenateMapTest : TestBase() {
-    @Test
-    fun testConcatenate() = runTest {
-        val n = 100
-        val sum = flow {
-            repeat(n) {
-                emit(it + 1) // 1..100
-            }
-        }.concatenate { value ->
-            // 1 + (1 + 2) + (1 + 2 + 3) + ... (1 + .. + n)
-            flow {
-                repeat(value) {
-                    emit(it + 1)
-                }
-            }
-        }.sum()
-
-        assertEquals(n * (n + 1) * (n + 2) / 6, sum)
-    }
-
-    @Test
-    fun testSingle() = runTest {
-        val flow = flow {
-            repeat(100) {
-                emit(it)
-            }
-        }.concatenate { value ->
-            if (value == 99) flowOf(42)
-            else flowOf()
-        }
-
-        val value = flow.single()
-        assertEquals(42, value)
-    }
-
-    @Test
-    fun testFailure() = runTest {
-        var finally = false
-        val latch = Channel<Unit>()
-        val flow = flow {
-            coroutineScope {
-                launch {
-                    latch.send(Unit)
-                    hang { finally = true }
-                }
-
-                emit(1)
-            }
-        }.concatenate {
-            flow<Int> {
-                latch.receive()
-                throw TestException()
-            }
-        }
-
-        assertFailsWith<TestException> { flow.count() }
-        assertTrue(finally)
-    }
-
-    @Test
-    fun testFailureInMapOperation() = runTest {
-        val latch = Channel<Unit>()
-        val flow = flow {
-            coroutineScope {
-                launch {
-                    latch.send(Unit)
-                    hang { expect(3) }
-                }
-
-                expect(1)
-                emit(1)
-            }
-        }.concatenate<Int, Int> {
-            latch.receive()
-            expect(2)
-            throw TestException()
-            flowOf<Int>() // Workaround for KT-30642, return type should not be Nothing
-        }
-
-        assertFailsWith<TestException> { flow.count() }
-        finish(4)
-    }
-
-    @Test
-    fun testContext() = runTest {
-        val captured = ArrayList<String>()
-        val flow = flowOf(1)
-            .flowOn(NamedDispatchers("irrelevant"))
-            .concatenate {
-                flow {
-                    captured += NamedDispatchers.name()
-                    emit(it)
-                }
-            }
-
-        flow.flowOn(NamedDispatchers("1")).sum()
-        flow.flowOn(NamedDispatchers("2")).sum()
-        assertEquals(listOf("1", "2"), captured)
-    }
-
-
-    @Test
-    fun testIsolatedContext() = runTest {
-        val flow = flowOf(1)
-            .flowOn(NamedDispatchers("irrelevant"))
-            .flowWith(NamedDispatchers("inner")) {
-                concatenate {
-                    flow {
-                        expect(2)
-                        assertEquals("inner", NamedDispatchers.name())
-                        emit(it)
-                    }
-                }
-            }.flowOn(NamedDispatchers("irrelevant"))
-            .concatenate {
-                flow {
-                    expect(3)
-                    assertEquals("outer", NamedDispatchers.name())
-                    emit(it)
-                }
-            }.flowOn(NamedDispatchers("outer"))
-
-        expect(1)
-        assertEquals(1, flow.single())
-        finish(4)
-    }
-}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ConcatenateTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ConcatenateTest.kt
deleted file mode 100644
index 6a50bc9..0000000
--- a/kotlinx-coroutines-core/common/test/flow/operators/ConcatenateTest.kt
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.flow
-
-import kotlinx.coroutines.*
-import kotlin.test.*
-
-class ConcatenateTest : TestBase() {
-    @Test
-    fun testConcatenate() = runTest {
-        val n = 100
-        val sum = (1..n).asFlow()
-            .map { value ->
-                flow {
-                    repeat(value) {
-                        emit(it + 1)
-                    }
-            }
-        }.concatenate().sum()
-        assertEquals(n * (n + 1) * (n + 2) / 6, sum)
-    }
-
-    @Test
-    fun testSingle() = runTest {
-        val flows = flow {
-            repeat(100) {
-                if (it == 99) emit(flowOf(42))
-                else emit(flowOf())
-            }
-        }
-
-        val value = flows.concatenate().single()
-        assertEquals(42, value)
-    }
-
-
-    @Test
-    fun testContext() = runTest {
-        val flow = flow {
-            emit(flow {
-                expect(2)
-                assertEquals("first", NamedDispatchers.name())
-                emit(1)
-            }.flowOn(NamedDispatchers("first")))
-
-            emit(flow {
-                expect(3)
-                assertEquals("second", NamedDispatchers.name())
-                emit(1)
-            }.flowOn(NamedDispatchers("second")))
-        }.concatenate().flowOn(NamedDispatchers("first"))
-
-        expect(1)
-        assertEquals(2, flow.sum())
-        finish(4)
-    }
-}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapBaseTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapBaseTest.kt
new file mode 100644
index 0000000..766b4a4
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapBaseTest.kt
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow.operators
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import kotlin.test.*
+
+abstract class FlatMapBaseTest : TestBase() {
+
+    abstract fun <T> Flow<T>.flatMap(mapper: suspend (T) -> Flow<T>): Flow<T>
+
+    @Test
+    fun testFlatMap() = runTest {
+        val n = 100
+        val sum = (1..100).asFlow()
+            .flatMap { value ->
+                // 1 + (1 + 2) + (1 + 2 + 3) + ... (1 + .. + n)
+                flow {
+                    repeat(value) {
+                        emit(it + 1)
+                    }
+                }
+            }.sum()
+
+        assertEquals(n * (n + 1) * (n + 2) / 6, sum)
+    }
+
+    @Test
+    fun testSingle() = runTest {
+        val flow = flow {
+            repeat(100) {
+                emit(it)
+            }
+        }.flatMap { value ->
+            if (value == 99) flowOf(42)
+            else flowOf()
+        }
+
+        val value = flow.single()
+        assertEquals(42, value)
+    }
+
+    @Test
+    fun testContext() = runTest {
+        val captured = ArrayList<String>()
+        val flow = flowOf(1)
+            .flowOn(NamedDispatchers("irrelevant"))
+            .flatMap {
+                captured += NamedDispatchers.name()
+                flow {
+                    captured += NamedDispatchers.name()
+                    emit(it)
+                }
+            }
+
+        flow.flowOn(NamedDispatchers("1")).sum()
+        flow.flowOn(NamedDispatchers("2")).sum()
+        assertEquals(listOf("1", "1", "2", "2"), captured)
+    }
+
+    @Test
+    fun testIsolatedContext() = runTest {
+        val flow = flowOf(1)
+            .flowOn(NamedDispatchers("irrelevant"))
+            .flowWith(NamedDispatchers("inner")) {
+                flatMap {
+                    flow {
+                        assertEquals("inner", NamedDispatchers.name())
+                        emit(it)
+                    }
+                }
+            }.flowOn(NamedDispatchers("irrelevant"))
+            .flatMap {
+                flow {
+                    assertEquals("outer", NamedDispatchers.name())
+                    emit(it)
+                }
+            }.flowOn(NamedDispatchers("outer"))
+
+        assertEquals(1, flow.singleOrNull())
+    }
+}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapConcatTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapConcatTest.kt
new file mode 100644
index 0000000..4bf1b16
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapConcatTest.kt
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.operators.*
+import kotlin.test.*
+
+class FlatMapConcatTest : FlatMapBaseTest() {
+
+    override fun <T> Flow<T>.flatMap(mapper: suspend (T) -> Flow<T>): Flow<T> = flatMapConcat(mapper = mapper)
+
+    @Test
+    fun testFlatMapConcurrency() = runTest {
+        var concurrentRequests = 0
+        val flow = (1..100).asFlow().flatMapConcat { value ->
+            flow {
+                ++concurrentRequests
+                emit(value)
+                delay(Long.MAX_VALUE)
+            }
+        }
+
+        val consumer = launch {
+            flow.collect { value ->
+                expect(value)
+            }
+        }
+
+        repeat(4) {
+            yield()
+        }
+
+        assertEquals(1, concurrentRequests)
+        consumer.cancelAndJoin()
+        finish(2)
+    }
+}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeBaseTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeBaseTest.kt
new file mode 100644
index 0000000..fe37ae6
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeBaseTest.kt
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+
+package kotlinx.coroutines.flow.operators
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import kotlin.test.*
+
+abstract class FlatMapMergeBaseTest : FlatMapBaseTest() {
+    @Test
+    fun testFailureCancellation() = runTest {
+        val flow = flow {
+            expect(2)
+            emit(1)
+            expect(3)
+            emit(2)
+            expect(4)
+        }.flatMap {
+            if (it == 1) flow {
+                hang { expect(6) }
+            } else flow<Int> {
+                expect(5)
+                throw TestException()
+            }
+        }
+
+        expect(1)
+        assertFailsWith<TestException> { flow.singleOrNull() }
+        finish(7)
+    }
+
+    @Test
+    fun testConcurrentFailure() = runTest {
+        val latch = Channel<Unit>()
+        val flow = flow {
+            expect(2)
+            emit(1)
+            expect(3)
+            emit(2)
+        }.flatMap {
+            if (it == 1) flow<Int> {
+                expect(5)
+                latch.send(Unit)
+                hang {
+                    expect(7)
+                    throw TestException2()
+
+                }
+            } else {
+                expect(4)
+                latch.receive()
+                expect(6)
+                throw TestException()
+            }
+        }
+
+        expect(1)
+        assertFailsWith<TestException>(flow)
+        finish(8)
+    }
+
+    @Test
+    fun testFailureInMapOperationCancellation() = runTest {
+        val latch = Channel<Unit>()
+        val flow = flow {
+            expect(2)
+            emit(1)
+            expect(3)
+            emit(2)
+            expectUnreached()
+        }.flatMap {
+            if (it == 1) flow<Int> {
+                expect(5)
+                latch.send(Unit)
+                hang { expect(7) }
+            } else {
+                expect(4)
+                latch.receive()
+                expect(6)
+                throw TestException()
+            }
+        }
+
+        expect(1)
+        assertFailsWith<TestException> { flow.count() }
+        finish(8)
+    }
+
+    @Test
+    abstract fun testFlatMapConcurrency()
+}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt
new file mode 100644
index 0000000..cbd68eb
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.operators.*
+import kotlin.test.*
+
+class FlatMapMergeTest : FlatMapMergeBaseTest() {
+
+    override fun <T> Flow<T>.flatMap(mapper: suspend (T) -> Flow<T>): Flow<T> = flatMapMerge(mapper = mapper)
+
+    @Test
+    override fun testFlatMapConcurrency() = runTest {
+        var concurrentRequests = 0
+        val flow = (1..100).asFlow().flatMapMerge(concurrency = 2) { value ->
+            flow {
+                ++concurrentRequests
+                emit(value)
+                delay(Long.MAX_VALUE)
+            }
+        }
+
+        val consumer = launch {
+            flow.collect { value ->
+                expect(value)
+            }
+        }
+
+        repeat(4) {
+            yield()
+        }
+
+        assertEquals(2, concurrentRequests)
+        consumer.cancelAndJoin()
+        finish(3)
+    }
+}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapTest.kt
deleted file mode 100644
index ad5d46b..0000000
--- a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapTest.kt
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.flow
-
-import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.*
-import kotlin.test.*
-
-class FlatMapTest : TestBase() {
-
-    @Test
-    fun testFlatMap() = runTest {
-        val n = 100
-        val sum = (1..100).asFlow()
-            .flatMap { value ->
-                // 1 + (1 + 2) + (1 + 2 + 3) + ... (1 + .. + n)
-                flow {
-                    repeat(value) {
-                        emit(it + 1)
-                    }
-                }
-            }.sum()
-
-        assertEquals(n * (n + 1) * (n + 2) / 6, sum)
-    }
-
-    @Test
-    fun testSingle() = runTest {
-        val flow = flow {
-            repeat(100) {
-                emit(it)
-            }
-        }.flatMap { value ->
-            if (value == 99) flowOf(42)
-            else flowOf()
-        }
-
-        val value = flow.single()
-        assertEquals(42, value)
-    }
-
-    @Test
-    fun testFailureCancellation() = runTest {
-        val flow = flow {
-            expect(2)
-            emit(1)
-            expect(3)
-            emit(2)
-            expect(4)
-        }.flatMap {
-            if (it == 1) flow {
-                hang { expect(6) }
-            } else flow<Int> {
-                expect(5)
-                throw TestException()
-            }
-        }
-
-        expect(1)
-        assertFailsWith<TestException> { flow.singleOrNull() }
-        finish(7)
-    }
-
-    @Test
-    fun testFailureInMapOperationCancellation() = runTest {
-        val latch = Channel<Unit>()
-        val flow = flow {
-            expect(2)
-            emit(1)
-            expect(3)
-            emit(2)
-            expectUnreached()
-        }.flatMap {
-            if (it == 1) flow<Int> {
-                expect(5)
-                latch.send(Unit)
-                hang { expect(7) }
-            } else {
-                expect(4)
-                latch.receive()
-                expect(6)
-                throw TestException()
-            }
-        }
-
-        expect(1)
-        assertFailsWith<TestException> { flow.count() }
-        finish(8)
-    }
-
-    @Test
-    fun testConcurrentFailure() = runTest {
-        val latch = Channel<Unit>()
-        val flow = flow {
-            expect(2)
-            emit(1)
-            expect(3)
-            emit(2)
-        }.flatMap {
-            if (it == 1) flow<Int> {
-                expect(5)
-                latch.send(Unit)
-                hang {
-                    expect(7)
-                    throw TestException2()
-
-                }
-            } else {
-                expect(4)
-                latch.receive()
-                expect(6)
-                throw TestException()
-            }
-        }
-
-        expect(1)
-        assertFailsWith<TestException>(flow)
-        finish(8)
-    }
-
-    @Test
-    fun testContext() = runTest {
-        val captured = ArrayList<String>()
-        val flow = flowOf(1)
-            .flowOn(NamedDispatchers("irrelevant"))
-            .flatMap {
-                captured += NamedDispatchers.name()
-                flow {
-                    captured += NamedDispatchers.name()
-                    emit(it)
-                }
-            }
-
-        flow.flowOn(NamedDispatchers("1")).sum()
-        flow.flowOn(NamedDispatchers("2")).sum()
-        assertEquals(listOf("1", "1", "2", "2"), captured)
-    }
-
-    @Test
-    fun testIsolatedContext() = runTest {
-        val flow = flowOf(1)
-            .flowOn(NamedDispatchers("irrelevant"))
-            .flowWith(NamedDispatchers("inner")) {
-                flatMap {
-                    flow {
-                        assertEquals("inner", NamedDispatchers.name())
-                        emit(it)
-                    }
-                }
-            }.flowOn(NamedDispatchers("irrelevant"))
-            .flatMap {
-                flow {
-                    assertEquals("outer", NamedDispatchers.name())
-                    emit(it)
-                }
-            }.flowOn(NamedDispatchers("outer"))
-
-        assertEquals(1, flow.singleOrNull())
-    }
-
-    @Test
-    fun testFlatMapConcurrency() = runTest {
-        var concurrentRequests = 0
-        val flow = (1..100).asFlow().flatMap(concurrency = 2) { value ->
-            flow {
-                ++concurrentRequests
-                emit(value)
-                delay(Long.MAX_VALUE)
-            }
-        }
-
-        val consumer = launch {
-            flow.collect { value ->
-                expect(value)
-            }
-        }
-
-        repeat(4) {
-            yield()
-        }
-
-        assertEquals(2, concurrentRequests)
-        consumer.cancelAndJoin()
-        finish(3)
-    }
-}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlattenConcatTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlattenConcatTest.kt
new file mode 100644
index 0000000..604b5df
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlattenConcatTest.kt
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.operators.*
+import kotlin.test.*
+
+class FlattenConcatTest : FlatMapBaseTest() {
+
+    override fun <T> Flow<T>.flatMap(mapper: suspend (T) -> Flow<T>): Flow<T> = map(mapper).flattenConcat()
+
+    @Test
+    fun testFlatMapConcurrency() = runTest {
+        var concurrentRequests = 0
+        val flow = (1..100).asFlow().map { value ->
+            flow {
+                ++concurrentRequests
+                emit(value)
+                delay(Long.MAX_VALUE)
+            }
+        }.flattenConcat()
+
+        val consumer = launch {
+            flow.collect { value ->
+                expect(value)
+            }
+        }
+
+        repeat(4) {
+            yield()
+        }
+
+        assertEquals(1, concurrentRequests)
+        consumer.cancelAndJoin()
+        finish(2)
+    }
+}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlattenMergeTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlattenMergeTest.kt
new file mode 100644
index 0000000..7774d17
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlattenMergeTest.kt
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.operators.*
+import kotlin.test.*
+
+class FlattenMergeTest : FlatMapMergeBaseTest() {
+
+    override fun <T> Flow<T>.flatMap(mapper: suspend (T) -> Flow<T>): Flow<T> = map(mapper).flattenMerge()
+
+    @Test
+    override fun testFlatMapConcurrency() = runTest {
+        var concurrentRequests = 0
+        val flow = (1..100).asFlow().map() { value ->
+            flow {
+                ++concurrentRequests
+                emit(value)
+                delay(Long.MAX_VALUE)
+            }
+        }.flattenMerge(concurrency = 2)
+
+        val consumer = launch {
+            flow.collect { value ->
+                expect(value)
+            }
+        }
+
+        repeat(4) {
+            yield()
+        }
+
+        assertEquals(2, concurrentRequests)
+        consumer.cancelAndJoin()
+        finish(3)
+    }
+}
diff --git a/kotlinx-coroutines-core/jvm/test/flow/FlatMapStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/FlatMapStressTest.kt
index c6a2357..ad18a69 100644
--- a/kotlinx-coroutines-core/jvm/test/flow/FlatMapStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/flow/FlatMapStressTest.kt
@@ -36,7 +36,7 @@
         withContext(Dispatchers.Default) {
             val inFlightElements = AtomicLong(0L)
             var result = 0
-            (1..iterations step 4).asFlow().flatMap(bufferSize = bufferSize) { value ->
+            (1..iterations step 4).asFlow().flatMapMerge(bufferSize = bufferSize) { value ->
                 unsafeFlow {
                     repeat(4) {
                         emit(value + it)
@@ -59,7 +59,7 @@
     @Test
     fun testDelivery() = runTest {
         withContext(Dispatchers.Default) {
-            val result = (1..iterations step 4).asFlow().flatMap { value ->
+            val result = (1..iterations step 4).asFlow().flatMapMerge { value ->
                 unsafeFlow {
                     repeat(4) { emit(value + it) }
                 }
@@ -72,7 +72,7 @@
     fun testIndependentShortBursts() = runTest {
         withContext(Dispatchers.Default) {
             repeat(iterations) {
-                val result = (1..4).asFlow().flatMap { value ->
+                val result = (1..4).asFlow().flatMapMerge { value ->
                     unsafeFlow {
                         emit(value)
                         emit(value)
@@ -86,7 +86,7 @@
     private suspend fun testConcurrencyLevel(maxConcurrency: Int) {
         assumeTrue(maxConcurrency <= CORE_POOL_SIZE)
         val concurrency = AtomicLong()
-        val result = (1..iterations).asFlow().flatMap(concurrency = maxConcurrency) { value ->
+        val result = (1..iterations).asFlow().flatMapMerge(concurrency = maxConcurrency) { value ->
             unsafeFlow {
                 val current = concurrency.incrementAndGet()
                 assertTrue(current in 1..maxConcurrency)