Properly distinguish AbortFlowExceptions from different non-terminal operators

Fixes #1610
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
index 8824095..584178d 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
@@ -114,7 +114,7 @@
          * Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
          */
         (second as SendChannel<*>).invokeOnClose {
-            if (!first.isClosedForReceive) first.cancel(AbortFlowException())
+            if (!first.isClosedForReceive) first.cancel(AbortFlowException(this@unsafeFlow))
         }
 
         val otherIterator = second.iterator()
@@ -126,9 +126,9 @@
                 emit(transform(NULL.unbox(value), NULL.unbox(otherIterator.next())))
             }
         } catch (e: AbortFlowException) {
-            // complete
+            e.checkOwnership(owner = this@unsafeFlow)
         } finally {
-            if (!second.isClosedForReceive) second.cancel(AbortFlowException())
+            if (!second.isClosedForReceive) second.cancel(AbortFlowException(this@unsafeFlow))
         }
     }
 }