More operators (#1236)

* Scan and emitAll operators
* Flow.first operators family (without firstOrNull and firstOrDefault support)
* More migrations

Fixes #1094
Fixes #1078
Fixes #1244
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
index ab51a2d..1dcad70 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
@@ -822,10 +822,13 @@
 	public static final fun distinctUntilChangedBy (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun drop (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
 	public static final fun dropWhile (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun emitAll (Lkotlinx/coroutines/flow/FlowCollector;Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 	public static final fun emptyFlow ()Lkotlinx/coroutines/flow/Flow;
 	public static final fun filter (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun filterNot (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun filterNotNull (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+	public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 	public static final fun flatMapConcat (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun flatMapMerge (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
 	public static synthetic fun flatMapMerge$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
@@ -853,6 +856,8 @@
 	public static final fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
 	public static synthetic fun retry$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun sample (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
+	public static final fun scan (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun scanReduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun single (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 	public static final fun singleOrNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 	public static final fun switchMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
@@ -872,13 +877,17 @@
 	public static final fun BehaviourSubject ()Ljava/lang/Object;
 	public static final fun PublishSubject ()Ljava/lang/Object;
 	public static final fun ReplaySubject ()Ljava/lang/Object;
+	public static final fun compose (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun concatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun flatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun flatten (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun forEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)V
 	public static final fun merge (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun onErrorResume (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun publishOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun scanFold (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun skip (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
 	public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;)V
 	public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)V
 	public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)V
diff --git a/kotlinx-coroutines-core/common/src/flow/Migration.kt b/kotlinx-coroutines-core/common/src/flow/Migration.kt
index bf20d2f..114a32e 100644
--- a/kotlinx-coroutines-core/common/src/flow/Migration.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Migration.kt
@@ -193,3 +193,56 @@
     replaceWith = ReplaceWith("flattenConcat()")
 )
 public fun <T> Flow<Flow<T>>.flatten(): Flow<T> = error("Should not be called")
+
+/**
+ * Kotlin has a built-in generic mechanism for making chained calls.
+ * If you wish to write something like
+ * ```
+ * myFlow.compose(MyFlowExtensions.ignoreErrors()).collect { ... }
+ * ```
+ * you can replace it with
+ *
+ * ```
+ * myFlow.let(MyFlowExtensions.ignoreErrors()).collect { ... }
+ * ```
+ *
+ * @suppress
+ */
+@Deprecated(
+    level = DeprecationLevel.ERROR,
+    message = "Kotlin analogue of compose is 'let'",
+    replaceWith = ReplaceWith("let(transformer)")
+)
+public fun <T, R> Flow<T>.compose(transformer: Flow<T>.() -> Flow<R>): Flow<R> = error("Should not be called")
+
+/**
+ * @suppress
+ */
+@Deprecated(
+    level = DeprecationLevel.ERROR,
+    message = "Kotlin analogue of 'skip' is 'drop'",
+    replaceWith = ReplaceWith("drop(count)")
+)
+public fun <T> Flow<T>.skip(count: Int): Flow<T> = error("Should not be called")
+
+/**
+ * Flow extension to iterate over elements is [collect].
+ * Foreach wasn't introduced deliberately to avoid confusion.
+ * Flow is not a collection, iteration over it may be not idempotent
+ * and can *launch* computations with side-effects.
+ * This behaviour is not reflected in [forEach] name.
+ * @suppress
+ */
+@Deprecated(
+    level = DeprecationLevel.ERROR,
+    message = "Flow analogue of 'forEach' is 'collect'",
+    replaceWith = ReplaceWith("collect(block)")
+)
+public fun <T> Flow<T>.forEach(action: suspend (value: T) -> Unit): Unit = error("Should not be called")
+
+@Deprecated(
+    level = DeprecationLevel.ERROR,
+    message = "Flow has less verbose 'scan' shortcut",
+    replaceWith = ReplaceWith("scan(initial, operation)")
+)
+public fun <T, R> Flow<T>.scanFold(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> = error("Should not be called")
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt b/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt
index de964da..29777b7 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt
@@ -27,9 +27,7 @@
     predicate: ExceptionPredicate = ALWAYS_TRUE
 ): Flow<T> = collectSafely { e ->
     if (!predicate(e)) throw e
-    fallback.collect { value ->
-        emit(value)
-    }
+    emitAll(fallback)
 }
 
 /**
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
index 0fa6e8a..38b116a 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
@@ -80,11 +80,7 @@
  */
 @FlowPreview
 public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
-    collect { value ->
-        value.collect { innerValue ->
-            emit(innerValue)
-        }
-    }
+    collect { value -> emitAll(value) }
 }
 
 /**
@@ -137,9 +133,7 @@
         previousFlow?.join()
         // Undispatched to have better user experience in case of synchronous flows
         previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
-            transform(value).collect { innerValue ->
-                downstream.emit(innerValue)
-            }
+            downstream.emitAll(transform(value))
         }
     }
 }
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt
index aff523d..2ef4b97 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt
@@ -4,10 +4,12 @@
 
 @file:JvmMultifileClass
 @file:JvmName("FlowKt")
+@file:Suppress("UNCHECKED_CAST")
 
 package kotlinx.coroutines.flow
 
 import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.internal.NULL
 import kotlin.jvm.*
 import kotlinx.coroutines.flow.unsafeFlow as flow
 
@@ -97,3 +99,46 @@
         emit(value)
     }
 }
+
+/**
+ * Folds the given flow with [operation], emitting every intermediate result, including [initial] value.
+ * Note that initial value should be immutable (or should not be mutated) as it is shared between different collectors.
+ * For example:
+ * ```
+ * flowOf(1, 2, 3).accumulate(emptyList<Int>()) { acc, value -> acc + value }.toList()
+ * ```
+ * will produce `[], [1], [1, 2], [1, 2, 3]]`.
+ */
+@FlowPreview
+public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> = flow {
+    var accumulator: R = initial
+    emit(accumulator)
+    collect { value ->
+        accumulator = operation(accumulator, value)
+        emit(accumulator)
+    }
+}
+
+/**
+ * Reduces the given flow with [operation], emitting every intermediate result, including initial value.
+ * The first element is taken as initial value for operation accumulator.
+ * This operator has a sibling with initial value -- [scan].
+ *
+ * For example:
+ * ```
+ * flowOf(1, 2, 3, 4).scan { (v1, v2) -> v1 + v2 }.toList()
+ * ```
+ * will produce `[1, 3, 6, 10]`
+ */
+@FlowPreview
+public fun <T> Flow<T>.scanReduce(operation: suspend (accumulator: T, value: T) -> T): Flow<T> = flow {
+    var accumulator: Any? = NULL
+    collect { value ->
+        accumulator = if (accumulator === NULL) {
+            value
+        } else {
+            operation(accumulator as T, value)
+        }
+        emit(accumulator as T)
+    }
+}
diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt
index 624b51f..a6a218c 100644
--- a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt
+++ b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt
@@ -32,3 +32,9 @@
     collect(object : FlowCollector<T> {
         override suspend fun emit(value: T) = action(value)
     })
+
+/**
+ * Collects all the values from the given [flow] and emits them to the collector.
+ * Shortcut for `flow.collect { value -> emit(value) }`.
+ */
+public suspend inline fun <T> FlowCollector<T>.emitAll(flow: Flow<T>) = flow.collect(this)
diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt
index 3a519e6..4eca3ef 100644
--- a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt
+++ b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt
@@ -4,6 +4,7 @@
 
 @file:JvmMultifileClass
 @file:JvmName("FlowKt")
+@file:Suppress("UNCHECKED_CAST")
 
 package kotlinx.coroutines.flow
 
@@ -50,7 +51,7 @@
 }
 
 /**
- * Terminal operator, that awaits for one and only one value to be published.
+ * The terminal operator, that awaits for one and only one value to be published.
  * Throws [NoSuchElementException] for empty flow and [IllegalStateException] for flow
  * that contains more than one element.
  */
@@ -68,7 +69,7 @@
 }
 
 /**
- * Terminal operator, that awaits for one and only one value to be published.
+ * The terminal operator, that awaits for one and only one value to be published.
  * Throws [IllegalStateException] for flow that contains more than one element.
  */
 @FlowPreview
@@ -81,3 +82,45 @@
 
     return result
 }
+
+/**
+ * The terminal operator that returns the first element emitted by the flow and then cancels flow's collection.
+ * Throws [NoSuchElementException] if the flow was empty.
+ */
+@FlowPreview
+public suspend fun <T> Flow<T>.first(): T {
+    var result: Any? = NULL
+    try {
+        collect { value ->
+            result = value
+            throw AbortFlowException()
+        }
+    } catch (e: AbortFlowException) {
+        // Do nothing
+    }
+
+    if (result === NULL) throw NoSuchElementException("Expected at least one element")
+    return result as T
+}
+
+/**
+ * The terminal operator that returns the first element emitted by the flow matching the given [predicate] and then cancels flow's collection.
+ * Throws [NoSuchElementException] if the flow has not contained elements matching the [predicate].
+ */
+@FlowPreview
+public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T {
+    var result: Any? = NULL
+    try {
+        collect { value ->
+            if (predicate(value)) {
+                result = value
+                throw AbortFlowException()
+            }
+        }
+    } catch (e: AbortFlowException) {
+        // Do nothing
+    }
+
+    if (result === NULL) throw NoSuchElementException("Expected at least one element matching the predicate $predicate")
+    return result as T
+}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt
new file mode 100644
index 0000000..d739f1a
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlin.test.*
+
+class ScanTest : TestBase() {
+    @Test
+    fun testScan() = runTest {
+        val flow = flowOf(1, 2, 3, 4, 5)
+        val result = flow.scanReduce { acc, v -> acc + v }.toList()
+        assertEquals(listOf(1, 3, 6, 10, 15), result)
+    }
+
+    @Test
+    fun testScanWithInitial() = runTest {
+        val flow = flowOf(1, 2, 3)
+        val result = flow.scan(emptyList<Int>()) { acc, value -> acc + value }.toList()
+        assertEquals(listOf(emptyList(), listOf(1), listOf(1, 2), listOf(1, 2, 3)), result)
+    }
+
+    @Test
+    fun testNulls() = runTest {
+        val flow = flowOf(null, 2, null, null, null, 5)
+        val result = flow.scanReduce { acc, v ->  if (v == null) acc else (if (acc == null) v else acc + v) }.toList()
+        assertEquals(listOf(null, 2, 2, 2, 2, 7), result)
+    }
+
+    @Test
+    fun testEmptyFlow() = runTest {
+        val result = emptyFlow<Int>().scanReduce { _, _ -> 1 }.toList()
+        assertTrue(result.isEmpty())
+    }
+
+    @Test
+    fun testErrorCancelsUpstream() = runTest {
+        expect(1)
+        val latch = Channel<Unit>()
+        val flow = flow {
+            coroutineScope {
+                launch {
+                    latch.send(Unit)
+                    hang { expect(3) }
+                }
+                emit(1)
+                emit(2)
+            }
+        }.scanReduce { _, value ->
+            expect(value) // 2
+            latch.receive()
+            throw TestException()
+        }.onErrorCollect(emptyFlow())
+
+        assertEquals(1, flow.single())
+        finish(4)
+    }
+
+    public operator fun <T> Collection<T>.plus(element: T): List<T> {
+        val result = ArrayList<T>(size + 1)
+        result.addAll(this)
+        result.add(element)
+        return result
+    }
+}
diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt
new file mode 100644
index 0000000..e84d4c7
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlin.test.*
+
+class FirstTest : TestBase() {
+    @Test
+    fun testFirst() = runTest {
+        val flow = flowOf(1, 2, 3)
+        assertEquals(1, flow.first())
+    }
+
+    @Test
+    fun testNulls() = runTest {
+        val flow = flowOf(null, 1)
+        assertNull(flow.first())
+        assertNull(flow.first { it == null })
+        assertEquals(1, flow.first { it != null })
+    }
+
+    @Test
+    fun testFirstWithPredicate() = runTest {
+        val flow = flowOf(1, 2, 3)
+        assertEquals(1, flow.first { it > 0 })
+        assertEquals(2, flow.first { it > 1 })
+        assertFailsWith<NoSuchElementException> { flow.first { it > 3 } }
+    }
+
+    @Test
+    fun testFirstCancellation() = runTest {
+        val latch = Channel<Unit>()
+        val flow = flow {
+            coroutineScope {
+                launch {
+                    latch.send(Unit)
+                    hang { expect(1) }
+                }
+                emit(1)
+                emit(2)
+            }
+        }
+
+
+        val result = flow.first {
+            latch.receive()
+            true
+        }
+        assertEquals(1, result)
+        finish(2)
+    }
+
+    @Test
+    fun testEmptyFlow() = runTest {
+        assertFailsWith<NoSuchElementException> { emptyFlow<Int>().first() }
+        assertFailsWith<NoSuchElementException> { emptyFlow<Int>().first { true } }
+    }
+
+    @Test
+    fun testErrorCancelsUpstream() = runTest {
+        val latch = Channel<Unit>()
+        val flow = flow {
+            coroutineScope {
+                launch {
+                    latch.send(Unit)
+                    hang { expect(1) }
+                }
+                emit(1)
+            }
+        }
+
+        assertFailsWith<TestException> {
+            flow.first {
+                latch.receive()
+                throw TestException()
+            }
+        }
+
+        assertEquals(1, flow.first())
+        finish(2)
+    }
+}