Various improvements in combine implementation
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
index 3f531bc..a277169 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
@@ -891,7 +891,7 @@
 	public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 	public static final fun flatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun flatMapConcat (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
-	public static final fun flatMapLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun flatMapLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun flatMapMerge (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
 	public static synthetic fun flatMapMerge$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun flatten (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
@@ -908,7 +908,6 @@
 	public static synthetic fun flowViaChannel$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun flowWith (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
 	public static synthetic fun flowWith$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
-	public static final fun flowZip (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun fold (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 	public static final fun forEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)V
 	public static final fun getDEFAULT_CONCURRENCY ()I
@@ -982,7 +981,7 @@
 }
 
 public final class kotlinx/coroutines/flow/internal/CombineKt {
-	public static final fun combine ([Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun combineInternal (Lkotlinx/coroutines/flow/FlowCollector;[Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 }
 
 public final class kotlinx/coroutines/flow/internal/FlowExceptions_commonKt {
diff --git a/kotlinx-coroutines-core/common/src/flow/Migration.kt b/kotlinx-coroutines-core/common/src/flow/Migration.kt
index 28f5b19..16769ad 100644
--- a/kotlinx-coroutines-core/common/src/flow/Migration.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Migration.kt
@@ -437,4 +437,4 @@
     message = "Flow analogues of 'switchMap' are 'transformLatest', 'flatMapLatest' and 'mapLatest'",
     replaceWith = ReplaceWith("this.flatMapLatest(transform)")
 )
-public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = noImpl()
+public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = flatMapLatest(transform)
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
index 28b319d..f7edad0 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
@@ -8,14 +8,14 @@
 import kotlinx.coroutines.*
 import kotlinx.coroutines.channels.*
 import kotlinx.coroutines.flow.*
-import kotlinx.coroutines.internal.Symbol
+import kotlinx.coroutines.internal.*
 import kotlinx.coroutines.selects.*
 
 internal fun getNull(): Symbol = NULL // Workaround for JS BE bug
 
-internal suspend inline fun <T1, T2, R> FlowCollector<R>.combineTransformInternal(
+internal suspend fun <T1, T2, R> FlowCollector<R>.combineTransformInternal(
     first: Flow<T1>, second: Flow<T2>,
-    crossinline transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
+    transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
 ) {
     coroutineScope {
         val firstChannel = asFairChannel(first)
@@ -45,11 +45,11 @@
 }
 
 @PublishedApi
-internal fun <T, R> combine(
-    vararg flows: Flow<T>,
+internal suspend fun <R, T> FlowCollector<R>.combineInternal(
+    flows: Array<out Flow<T>>,
     arrayFactory: () -> Array<T?>,
     transform: suspend FlowCollector<R>.(Array<T>) -> Unit
-): Flow<R> = flow {
+) {
     coroutineScope {
         val size = flows.size
         val channels =
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt b/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt
index 4b4b856..f621be0 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt
@@ -20,6 +20,7 @@
         ChannelFlowTransformLatest(transform, flow, context, capacity)
 
     override suspend fun flowCollect(collector: FlowCollector<R>) {
+        assert { collector is SendingCollector } // So cancellation behaviour is not leaking into the downstream
         flowScope {
             var previousFlow: Job? = null
             flow.collect { value ->
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
index 62d5e4c..f3a1126 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
@@ -24,6 +24,7 @@
  * generic function that may transform emitted element, skip it or emit it multiple times.
  *
  * This operator can be used as a building block for other operators, for example:
+ *
  * ```
  * fun Flow<Int>.skipOddAndDuplicateEven(): Flow<Int> = transform { value ->
  *     if (value % 2 == 0) { // Emit only even values, but twice
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
index 911a83c..dccc1cd 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
@@ -120,7 +120,8 @@
  * ```
  * produces `a b b_last`.
  *
- * This operator is [buffered][buffer] by default and size of its output buffer can be changed by applying subsequent [buffer] operator.
+ * This operator is [buffered][buffer] by default
+ * and size of its output buffer can be changed by applying subsequent [buffer] operator.
  */
 @ExperimentalCoroutinesApi
 public fun <T, R> Flow<T>.transformLatest(@BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R> =
@@ -149,7 +150,7 @@
  * This operator is [buffered][buffer] by default and size of its output buffer can be changed by applying subsequent [buffer] operator.
  */
 @ExperimentalCoroutinesApi
-public fun <T, R> Flow<T>.flatMapLatest(@BuilderInference transform: (value: T) -> Flow<R>): Flow<R> =
+public inline fun <T, R> Flow<T>.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>): Flow<R> =
     transformLatest { emitAll(transform(it)) }
 
 /**
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt
index a3f5830..ba4f052 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt
@@ -242,7 +242,9 @@
 public inline fun <reified T, R> combine(
     vararg flows: Flow<T>,
     crossinline transform: suspend (Array<T>) -> R
-): Flow<R> = combine(*flows, arrayFactory = { arrayOfNulls(flows.size) }, transform = { emit(transform(it)) })
+): Flow<R> = flow {
+    combineInternal(flows, { arrayOfNulls(flows.size) }, { emit(transform(it)) })
+}
 
 /**
  * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
@@ -254,7 +256,9 @@
 public inline fun <reified T, R> combineTransform(
     vararg flows: Flow<T>,
     @BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
-): Flow<R> = combine(*flows, arrayFactory = { arrayOfNulls(flows.size) }, transform = { transform(it) })
+): Flow<R> = safeFlow {
+    combineInternal(flows, { arrayOfNulls(flows.size) }, { transform(it) })
+}
 
 /**
  * Returns a [Flow] whose values are generated with [transform] function by combining
@@ -266,7 +270,12 @@
     crossinline transform: suspend (Array<T>) -> R
 ): Flow<R> {
     val flowArray = flows.toList().toTypedArray()
-    return combine(*flowArray, arrayFactory = { arrayOfNulls(flowArray.size) }, transform = { emit(transform(it)) })
+    return flow {
+        combineInternal(
+            flowArray,
+            arrayFactory = { arrayOfNulls(flowArray.size) },
+            transform = { emit(transform(it)) })
+    }
 }
 
 /**
@@ -281,7 +290,9 @@
     @BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
 ): Flow<R> {
     val flowArray = flows.toList().toTypedArray()
-    return combine(*flowArray, arrayFactory = { arrayOfNulls(flowArray.size) }, transform = { transform(it) })
+    return safeFlow {
+        combineInternal(flowArray, { arrayOfNulls(flowArray.size) }, { transform(it) })
+    }
 }
 
 /**
@@ -297,22 +308,5 @@
  * }
  * ```
  */
-@JvmName("flowZip")
 @ExperimentalCoroutinesApi
 public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = zipImpl(this, other, transform)
-
-/**
- * Zips values from the current flow (`this`) with [other] flow using provided [transform] function applied to each pair of values.
- * The resulting flow completes as soon as one of the flows completes and cancel is called on the remaining flow.
- *
- * It can be demonstrated with the following example:
- * ```
- * val flow = flowOf(1, 2, 3).delayEach(10)
- * val flow2 = flowOf("a", "b", "c", "d").delayEach(15)
- * flow.zip(flow2) { i, s -> i.toString() + s }.collect {
- *     println(it) // Will print "1a 2b 3c"
- * }
- * ```
- */
-@ExperimentalCoroutinesApi
-public fun <T1, T2, R> zip(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = zipImpl(flow, flow2, transform)
diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt
index 6f3af40..c9480f9 100644
--- a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt
+++ b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt
@@ -35,6 +35,7 @@
  *
  * This operator is usually used with [onEach], [onCompletion] and [catch] operators to process all emitted values
  * handle an exception that might occur in the upstream flow or during processing, for example:
+ *
  * ```
  * flow
  *     .onEach { value -> updateUi(value) }
@@ -90,7 +91,9 @@
  * Terminal flow operator that collects the given flow with a provided [action].
  * The crucial difference from [collect] is that when the original flow emits a new value, [action] block for previous
  * value is cancelled.
+ *
  * It can be demonstrated by the following example:
+ *
  * ```
  * flow {
  *     emit(1)