Properly distinguish AbortFlowExceptions from different non-terminal operators
Fixes #1610
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
index 8824095..584178d 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
@@ -114,7 +114,7 @@
* Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
*/
(second as SendChannel<*>).invokeOnClose {
- if (!first.isClosedForReceive) first.cancel(AbortFlowException())
+ if (!first.isClosedForReceive) first.cancel(AbortFlowException(this@unsafeFlow))
}
val otherIterator = second.iterator()
@@ -126,9 +126,9 @@
emit(transform(NULL.unbox(value), NULL.unbox(otherIterator.next())))
}
} catch (e: AbortFlowException) {
- // complete
+ e.checkOwnership(owner = this@unsafeFlow)
} finally {
- if (!second.isClosedForReceive) second.cancel(AbortFlowException())
+ if (!second.isClosedForReceive) second.cancel(AbortFlowException(this@unsafeFlow))
}
}
}