Properly distinguish AbortFlowExceptions from different non-terminal operators

Fixes #1610
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
index 8824095..584178d 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
@@ -114,7 +114,7 @@
          * Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
          */
         (second as SendChannel<*>).invokeOnClose {
-            if (!first.isClosedForReceive) first.cancel(AbortFlowException())
+            if (!first.isClosedForReceive) first.cancel(AbortFlowException(this@unsafeFlow))
         }
 
         val otherIterator = second.iterator()
@@ -126,9 +126,9 @@
                 emit(transform(NULL.unbox(value), NULL.unbox(otherIterator.next())))
             }
         } catch (e: AbortFlowException) {
-            // complete
+            e.checkOwnership(owner = this@unsafeFlow)
         } finally {
-            if (!second.isClosedForReceive) second.cancel(AbortFlowException())
+            if (!second.isClosedForReceive) second.cancel(AbortFlowException(this@unsafeFlow))
         }
     }
 }
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/FlowExceptions.common.kt b/kotlinx-coroutines-core/common/src/flow/internal/FlowExceptions.common.kt
index c3a85a3..7058dca 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/FlowExceptions.common.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/FlowExceptions.common.kt
@@ -5,12 +5,21 @@
 package kotlinx.coroutines.flow.internal
 
 import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
 
 /**
  * This exception is thrown when operator need no more elements from the flow.
  * This exception should never escape outside of operator's implementation.
+ * This exception can be safely ignored by non-terminal flow operator if and only if it was caught by its owner
+ * (see usages of [checkOwnership]).
  */
-internal expect class AbortFlowException() : CancellationException
+internal expect class AbortFlowException(owner: FlowCollector<*>) : CancellationException {
+    public val owner: FlowCollector<*>
+}
+
+internal fun AbortFlowException.checkOwnership(owner: FlowCollector<*>) {
+    if (this.owner !== owner) throw this
+}
 
 /**
  * Exception used to cancel child of [scopedFlow] without cancelling the whole scope.
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt b/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt
index 1343dad..6f4e8e7 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt
@@ -62,14 +62,14 @@
                 }
             }
         } catch (e: AbortFlowException) {
-            // Nothing, bail out
+            e.checkOwnership(owner = this)
         }
     }
 }
 
 private suspend fun <T> FlowCollector<T>.emitAbort(value: T) {
     emit(value)
-    throw AbortFlowException()
+    throw AbortFlowException(this)
 }
 
 /**
@@ -80,9 +80,9 @@
     try {
         collect { value ->
             if (predicate(value)) emit(value)
-            else throw AbortFlowException()
+            else throw AbortFlowException(this)
         }
     } catch (e: AbortFlowException) {
-        // Nothing, bail out
+        e.checkOwnership(owner = this)
     }
 }
diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt
index c9480f9..de7d260 100644
--- a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt
+++ b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt
@@ -130,5 +130,6 @@
  * Collects all the values from the given [flow] and emits them to the collector.
  * It is a shorthand for `flow.collect { value -> emit(value) }`.
  */
+@BuilderInference
 @ExperimentalCoroutinesApi
 public suspend inline fun <T> FlowCollector<T>.emitAll(flow: Flow<T>) = flow.collect(this)
diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt
index 875e6b6..e8433de 100644
--- a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt
+++ b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt
@@ -90,7 +90,7 @@
     try {
         collect { value ->
             result = value
-            throw AbortFlowException()
+            throw AbortFlowException(NopCollector)
         }
     } catch (e: AbortFlowException) {
         // Do nothing
@@ -110,7 +110,7 @@
         collect { value ->
             if (predicate(value)) {
                 result = value
-                throw AbortFlowException()
+                throw AbortFlowException(NopCollector)
             }
         }
     } catch (e: AbortFlowException) {