combineLatest rework

  * Operator renamed to combine
  * Introduced combineTransform operator with custom transformer
  * Decouple API and implementation details to improve user experience from IDE
  * combine(Iterable<Flow>) and combineTransform(Iterable<Flow>) are introduced

Fixes #1224
Fixes #1262
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
index b899a3b..7acc67e 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
@@ -847,12 +847,22 @@
 	public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 	public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 	public static final fun collectIndexed (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+	public static final synthetic fun combine (Ljava/lang/Iterable;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun combine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun combine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun combine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun combine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function6;)Lkotlinx/coroutines/flow/Flow;
+	public static final synthetic fun combine ([Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function6;)Lkotlinx/coroutines/flow/Flow;
-	public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;[Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
-	public static final synthetic fun combineLatest (Lkotlinx/coroutines/flow/Flow;[Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+	public static final synthetic fun combineTransform (Ljava/lang/Iterable;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun combineTransform (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun combineTransform (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun combineTransform (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function6;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun combineTransform (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function7;)Lkotlinx/coroutines/flow/Flow;
+	public static final synthetic fun combineTransform ([Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun compose (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun concatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun concatWith (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
@@ -887,6 +897,8 @@
 	public static final fun flattenMerge (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
 	public static synthetic fun flattenMerge$default (Lkotlinx/coroutines/flow/Flow;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun flow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun flowCombine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun flowCombineTransform (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun flowOf (Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun flowOf ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun flowOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
@@ -894,6 +906,7 @@
 	public static synthetic fun flowViaChannel$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun flowWith (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
 	public static synthetic fun flowWith$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun flowZip (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun fold (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 	public static final fun forEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)V
 	public static final fun getDEFAULT_CONCURRENCY ()I
@@ -964,6 +977,10 @@
 	public static synthetic fun update$default (Lkotlinx/coroutines/flow/internal/ChannelFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/ChannelFlow;
 }
 
+public final class kotlinx/coroutines/flow/internal/CombineKt {
+	public static final fun combine ([Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
+}
+
 public final class kotlinx/coroutines/flow/internal/FlowExceptions_commonKt {
 	public static final fun checkIndexOverflow (I)I
 }
diff --git a/kotlinx-coroutines-core/common/src/flow/Migration.kt b/kotlinx-coroutines-core/common/src/flow/Migration.kt
index 0b48b97..a579c55 100644
--- a/kotlinx-coroutines-core/common/src/flow/Migration.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Migration.kt
@@ -364,6 +364,50 @@
 )
 public fun <T> Flow<T>.concatWith(other: Flow<T>): Flow<T> = noImpl()
 
+@Deprecated(
+    level = DeprecationLevel.ERROR,
+    message = "Flow analogue of 'combineLatest' is 'combine'",
+    replaceWith = ReplaceWith("this.combine(other, transform)")
+)
+public fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
+    combine(this, other, transform)
+
+@Deprecated(
+    level = DeprecationLevel.ERROR,
+    message = "Flow analogue of 'combineLatest' is 'combine'",
+    replaceWith = ReplaceWith("combine(this, other, other2, transform)")
+)
+public inline fun <T1, T2, T3, R> Flow<T1>.combineLatest(
+    other: Flow<T2>,
+    other2: Flow<T3>,
+    crossinline transform: suspend (T1, T2, T3) -> R
+) = combine(this, other, other2, transform)
+
+@Deprecated(
+    level = DeprecationLevel.ERROR,
+    message = "Flow analogue of 'combineLatest' is 'combine'",
+    replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)")
+)
+public inline fun <T1, T2, T3, T4, R> Flow<T1>.combineLatest(
+    other: Flow<T2>,
+    other2: Flow<T3>,
+    other3: Flow<T4>,
+    crossinline transform: suspend (T1, T2, T3, T4) -> R
+) = combine(this, other, other2, other3, transform)
+
+@Deprecated(
+    level = DeprecationLevel.ERROR,
+    message = "Flow analogue of 'combineLatest' is 'combine'",
+    replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)")
+)
+public inline fun <T1, T2, T3, T4, T5, R> Flow<T1>.combineLatest(
+    other: Flow<T2>,
+    other2: Flow<T3>,
+    other3: Flow<T4>,
+    other4: Flow<T5>,
+    crossinline transform: suspend (T1, T2, T3, T4, T5) -> R
+): Flow<R> = combine(this, other, other2, other3, other4, transform)
+
 /**
  * Delays the emission of values from this flow for the given [timeMillis].
  * Use `onStart { delay(timeMillis) }`.
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
new file mode 100644
index 0000000..28b319d
--- /dev/null
+++ b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+@file:Suppress("UNCHECKED_CAST", "NON_APPLICABLE_CALL_FOR_BUILDER_INFERENCE") // KT-32203
+
+package kotlinx.coroutines.flow.internal
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.internal.Symbol
+import kotlinx.coroutines.selects.*
+
+internal fun getNull(): Symbol = NULL // Workaround for JS BE bug
+
+internal suspend inline fun <T1, T2, R> FlowCollector<R>.combineTransformInternal(
+    first: Flow<T1>, second: Flow<T2>,
+    crossinline transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
+) {
+    coroutineScope {
+        val firstChannel = asFairChannel(first)
+        val secondChannel = asFairChannel(second)
+        var firstValue: Any? = null
+        var secondValue: Any? = null
+        var firstIsClosed = false
+        var secondIsClosed = false
+        while (!firstIsClosed || !secondIsClosed) {
+            select<Unit> {
+                onReceive(firstIsClosed, firstChannel, { firstIsClosed = true }) { value ->
+                    firstValue = value
+                    if (secondValue !== null) {
+                        transform(getNull().unbox(firstValue), getNull().unbox(secondValue) as T2)
+                    }
+                }
+
+                onReceive(secondIsClosed, secondChannel, { secondIsClosed = true }) { value ->
+                    secondValue = value
+                    if (firstValue !== null) {
+                        transform(getNull().unbox(firstValue) as T1, getNull().unbox(secondValue) as T2)
+                    }
+                }
+            }
+        }
+    }
+}
+
+@PublishedApi
+internal fun <T, R> combine(
+    vararg flows: Flow<T>,
+    arrayFactory: () -> Array<T?>,
+    transform: suspend FlowCollector<R>.(Array<T>) -> Unit
+): Flow<R> = flow {
+    coroutineScope {
+        val size = flows.size
+        val channels =
+            Array(size) { asFairChannel(flows[it]) }
+        val latestValues = arrayOfNulls<Any?>(size)
+        val isClosed = Array(size) { false }
+
+        // See flow.combine(other) for explanation.
+        while (!isClosed.all { it }) {
+            select<Unit> {
+                for (i in 0 until size) {
+                    onReceive(isClosed[i], channels[i], { isClosed[i] = true }) { value ->
+                        latestValues[i] = value
+                        if (latestValues.all { it !== null }) {
+                            val arguments = arrayFactory()
+                            for (index in 0 until size) {
+                                arguments[index] = NULL.unbox(latestValues[index])
+                            }
+                            transform(arguments as Array<T>)
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
+
+private inline fun SelectBuilder<Unit>.onReceive(
+    isClosed: Boolean,
+    channel: ReceiveChannel<Any>,
+    crossinline onClosed: () -> Unit,
+    noinline onReceive: suspend (value: Any) -> Unit
+) {
+    if (isClosed) return
+    channel.onReceiveOrNull {
+        // TODO onReceiveOrClosed when boxing issues are fixed
+        if (it === null) onClosed()
+        else onReceive(it)
+    }
+}
+
+// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
+private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
+    val channel = channel as ChannelCoroutine<Any>
+    flow.collect { value ->
+        return@collect channel.sendFair(value ?: NULL)
+    }
+}
+
+internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = unsafeFlow {
+    coroutineScope {
+        val first = asChannel(flow)
+        val second = asChannel(flow2)
+        /*
+         * This approach only works with rendezvous channel and is required to enforce correctness
+         * in the following scenario:
+         * ```
+         * val f1 = flow { emit(1); delay(Long.MAX_VALUE) }
+         * val f2 = flowOf(1)
+         * f1.zip(f2) { ... }
+         * ```
+         *
+         * Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
+         */
+        (second as SendChannel<*>).invokeOnClose {
+            if (!first.isClosedForReceive) first.cancel(AbortFlowException())
+        }
+
+        val otherIterator = second.iterator()
+        try {
+            first.consumeEach { value ->
+                if (!otherIterator.hasNext()) {
+                    return@consumeEach
+                }
+                emit(transform(NULL.unbox(value), NULL.unbox(otherIterator.next())))
+            }
+        } catch (e: AbortFlowException) {
+            // complete
+        } finally {
+            if (!second.isClosedForReceive) second.cancel(AbortFlowException())
+        }
+    }
+}
+
+// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
+private fun CoroutineScope.asChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
+    flow.collect { value ->
+        return@collect channel.send(value ?: NULL)
+    }
+}
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
index f3a1126..62d5e4c 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
@@ -24,7 +24,6 @@
  * generic function that may transform emitted element, skip it or emit it multiple times.
  *
  * This operator can be used as a building block for other operators, for example:
- *
  * ```
  * fun Flow<Int>.skipOddAndDuplicateEven(): Flow<Int> = transform { value ->
  *     if (value % 2 == 0) { // Emit only even values, but twice
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt
index 72822bb..a3f5830 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt
@@ -4,15 +4,14 @@
 
 @file:JvmMultifileClass
 @file:JvmName("FlowKt")
-@file:Suppress("UNCHECKED_CAST")
+@file:Suppress("UNCHECKED_CAST", "NON_APPLICABLE_CALL_FOR_BUILDER_INFERENCE") // KT-32203
 
 package kotlinx.coroutines.flow
 
 import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.*
 import kotlinx.coroutines.flow.internal.*
-import kotlinx.coroutines.selects.*
 import kotlin.jvm.*
+import kotlinx.coroutines.flow.flow as safeFlow
 import kotlinx.coroutines.flow.internal.unsafeFlow as flow
 
 /**
@@ -23,69 +22,123 @@
  * ```
  * val flow = flowOf(1, 2).delayEach(10)
  * val flow2 = flowOf("a", "b", "c").delayEach(15)
- * flow.combineLatest(flow2) { i, s -> i.toString() + s }.collect {
+ * flow.combine(flow2) { i, s -> i.toString() + s }.collect {
  *     println(it) // Will print "1a 2a 2b 2c"
  * }
  * ```
+ *
+ * This function is a shorthand for `flow.combineTransform(flow2) { a, b -> emit(transform(a, b)) }
  */
-@FlowPreview
-public fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = flow {
-    coroutineScope {
-        val firstChannel = asFairChannel(this@combineLatest)
-        val secondChannel = asFairChannel(other)
-        var firstValue: Any? = null
-        var secondValue: Any? = null
-        var firstIsClosed = false
-        var secondIsClosed = false
-
-        /*
-         * Fun fact, this select **semantically** equivalent of the following:
-         * ```
-         * selectWhile<Unit> {
-         *     channel.onReceive {
-         *         emitCombined(...)
-         *     }
-         *     channel2.onReceive {
-         *         emitCombined(...)
-         *     }
-         * }
-         * ```
-         * but we are waiting for `channels` branch to get merged where we will change semantics of the select
-         * to ignore finished clauses.
-         *
-         * Instead (especially in the face of non-fair channels) we are using our own hand-rolled select emulation
-         * on top of previous select.
-         */
-        while (!firstIsClosed || !secondIsClosed) {
-            select<Unit> {
-                onReceive(firstIsClosed, firstChannel, { firstIsClosed = true }) { value ->
-                    firstValue = value
-                    if (secondValue !== null) {
-                        emit(transform(NULL.unbox(firstValue), NULL.unbox(secondValue)))
-                    }
-                }
-
-                onReceive(secondIsClosed, secondChannel, { secondIsClosed = true }) { value ->
-                    secondValue = value
-                    if (firstValue !== null) {
-                        emit(transform(NULL.unbox(firstValue), NULL.unbox(secondValue)))
-                    }
-                }
-            }
-        }
+@JvmName("flowCombine")
+@ExperimentalCoroutinesApi
+public fun <T1, T2, R> Flow<T1>.combine(flow: Flow<T2>, transform: suspend (a: T1, b: T2) -> R): Flow<R> = flow {
+    combineTransformInternal(this@combine, flow) { a, b ->
+        emit(transform(a, b))
     }
 }
 
 /**
  * Returns a [Flow] whose values are generated with [transform] function by combining
  * the most recently emitted values by each flow.
+ *
+ * It can be demonstrated with the following example:
+ * ```
+ * val flow = flowOf(1, 2).delayEach(10)
+ * val flow2 = flowOf("a", "b", "c").delayEach(15)
+ * combine(flow, flow2) { i, s -> i.toString() + s }.collect {
+ *     println(it) // Will print "1a 2a 2b 2c"
+ * }
+ * ```
+ *
+ * This function is a shorthand for `combineTransform(flow, flow2) { a, b -> emit(transform(a, b)) }
  */
-@FlowPreview
-public inline fun <T1, T2, T3, R> Flow<T1>.combineLatest(
-    other: Flow<T2>,
-    other2: Flow<T3>,
-    crossinline transform: suspend (T1, T2, T3) -> R
-): Flow<R> = (this as Flow<*>).combineLatest(other, other2) { args: Array<*> ->
+@ExperimentalCoroutinesApi
+public fun <T1, T2, R> combine(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (a: T1, b: T2) -> R): Flow<R> =
+    flow.combine(flow2, transform)
+
+/**
+ * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
+ *
+ * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
+ * generic function that may transform emitted element, skip it or emit it multiple times.
+ *
+ * Its usage can be demonstrated with the following example:
+ * ```
+ * val flow = requestFlow()
+ * val flow2 = searchEngineFlow()
+ * flow.combineTransform(flow2) { request, searchEngine ->
+ *     emit("Downloading in progress")
+ *     val result = download(request, searchEngine)
+ *     emit(result)
+ * }
+ * ```
+ */
+@JvmName("flowCombineTransform")
+@ExperimentalCoroutinesApi
+public fun <T1, T2, R> Flow<T1>.combineTransform(
+    flow: Flow<T2>,
+    @BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
+): Flow<R> = safeFlow {
+    combineTransformInternal(this@combineTransform, flow) { a, b ->
+        transform(a, b)
+    }
+}
+
+/**
+ * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
+ *
+ * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
+ * generic function that may transform emitted element, skip it or emit it multiple times.
+ *
+ * Its usage can be demonstrated with the following example:
+ * ```
+ * val flow = requestFlow()
+ * val flow2 = searchEngineFlow()
+ * combineTransform(flow, flow2) { request, searchEngine ->
+ *     emit("Downloading in progress")
+ *     val result = download(request, searchEngine)
+ *     emit(result)
+ * }
+ * ```
+ */
+@ExperimentalCoroutinesApi
+public fun <T1, T2, R> combineTransform(
+    flow: Flow<T1>,
+    flow2: Flow<T2>,
+    @BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
+): Flow<R> = combineTransform(flow, flow2, transform)
+
+/**
+ * Returns a [Flow] whose values are generated with [transform] function by combining
+ * the most recently emitted values by each flow.
+ */
+@ExperimentalCoroutinesApi
+public inline fun <T1, T2, T3, R> combine(
+    flow: Flow<T1>,
+    flow2: Flow<T2>,
+    flow3: Flow<T3>,
+    @BuilderInference crossinline transform: suspend (T1, T2, T3) -> R
+): Flow<R> = combine(flow, flow2, flow3) { args: Array<*> ->
+    transform(
+        args[0] as T1,
+        args[1] as T2,
+        args[2] as T3
+    )
+}
+
+/**
+ * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
+ *
+ * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
+ * generic function that may transform emitted element, skip it or emit it multiple times.
+ */
+@ExperimentalCoroutinesApi
+public inline fun <T1, T2, T3, R> combineTransform(
+    flow: Flow<T1>,
+    flow2: Flow<T2>,
+    flow3: Flow<T3>,
+    @BuilderInference crossinline transform: suspend FlowCollector<R>.(T1, T2, T3) -> Unit
+): Flow<R> = combineTransform(flow, flow2, flow3) { args: Array<*> ->
     transform(
         args[0] as T1,
         args[1] as T2,
@@ -97,13 +150,36 @@
  * Returns a [Flow] whose values are generated with [transform] function by combining
  * the most recently emitted values by each flow.
  */
-@FlowPreview
-public inline fun <T1, T2, T3, T4, R> Flow<T1>.combineLatest(
-    other: Flow<T2>,
-    other2: Flow<T3>,
-    other3: Flow<T4>,
+@ExperimentalCoroutinesApi
+public inline fun <T1, T2, T3, T4, R> combine(
+    flow: Flow<T1>,
+    flow2: Flow<T2>,
+    flow3: Flow<T3>,
+    flow4: Flow<T4>,
     crossinline transform: suspend (T1, T2, T3, T4) -> R
-): Flow<R> = (this as Flow<*>).combineLatest(other, other2, other3) { args: Array<*> ->
+): Flow<R> = combine(flow, flow2, flow3, flow4) { args: Array<*> ->
+    transform(
+        args[0] as T1,
+        args[1] as T2,
+        args[2] as T3,
+        args[3] as T4
+    )
+}
+
+/**
+ * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
+ *
+ * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
+ * generic function that may transform emitted element, skip it or emit it multiple times.
+ */
+@ExperimentalCoroutinesApi
+public inline fun <T1, T2, T3, T4, R> combineTransform(
+    flow: Flow<T1>,
+    flow2: Flow<T2>,
+    flow3: Flow<T3>,
+    flow4: Flow<T4>,
+    @BuilderInference crossinline transform: suspend FlowCollector<R>.(T1, T2, T3, T4) -> Unit
+): Flow<R> = combineTransform(flow, flow2, flow3, flow4) { args: Array<*> ->
     transform(
         args[0] as T1,
         args[1] as T2,
@@ -116,14 +192,39 @@
  * Returns a [Flow] whose values are generated with [transform] function by combining
  * the most recently emitted values by each flow.
  */
-@FlowPreview
-public inline fun <T1, T2, T3, T4, T5, R> Flow<T1>.combineLatest(
-    other: Flow<T2>,
-    other2: Flow<T3>,
-    other3: Flow<T4>,
-    other4: Flow<T5>,
+@ExperimentalCoroutinesApi
+public inline fun <T1, T2, T3, T4, T5, R> combine(
+    flow: Flow<T1>,
+    flow2: Flow<T2>,
+    flow3: Flow<T3>,
+    flow4: Flow<T4>,
+    flow5: Flow<T5>,
     crossinline transform: suspend (T1, T2, T3, T4, T5) -> R
-): Flow<R> = (this as Flow<*>).combineLatest(other, other2, other3, other4) { args: Array<*> ->
+): Flow<R> = combine(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
+    transform(
+        args[0] as T1,
+        args[1] as T2,
+        args[2] as T3,
+        args[3] as T4,
+        args[4] as T5
+    )
+}
+
+/**
+ * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
+ *
+ * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
+ * generic function that may transform emitted element, skip it or emit it multiple times.
+ */
+@ExperimentalCoroutinesApi
+public inline fun <T1, T2, T3, T4, T5, R> combineTransform(
+    flow: Flow<T1>,
+    flow2: Flow<T2>,
+    flow3: Flow<T3>,
+    flow4: Flow<T4>,
+    flow5: Flow<T5>,
+    @BuilderInference crossinline transform: suspend FlowCollector<R>.(T1, T2, T3, T4, T5) -> Unit
+): Flow<R> = combineTransform(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
     transform(
         args[0] as T1,
         args[1] as T2,
@@ -137,64 +238,68 @@
  * Returns a [Flow] whose values are generated with [transform] function by combining
  * the most recently emitted values by each flow.
  */
-@FlowPreview
-public inline fun <reified T, R> Flow<T>.combineLatest(vararg others: Flow<T>, crossinline transform: suspend (Array<T>) -> R): Flow<R> =
-    combineLatest(*others, arrayFactory = { arrayOfNulls(others.size + 1) }, transform = { transform(it) })
+@ExperimentalCoroutinesApi
+public inline fun <reified T, R> combine(
+    vararg flows: Flow<T>,
+    crossinline transform: suspend (Array<T>) -> R
+): Flow<R> = combine(*flows, arrayFactory = { arrayOfNulls(flows.size) }, transform = { emit(transform(it)) })
+
+/**
+ * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
+ *
+ * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
+ * generic function that may transform emitted element, skip it or emit it multiple times.
+ */
+@ExperimentalCoroutinesApi
+public inline fun <reified T, R> combineTransform(
+    vararg flows: Flow<T>,
+    @BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
+): Flow<R> = combine(*flows, arrayFactory = { arrayOfNulls(flows.size) }, transform = { transform(it) })
 
 /**
  * Returns a [Flow] whose values are generated with [transform] function by combining
  * the most recently emitted values by each flow.
  */
-@PublishedApi
-internal fun <T, R> Flow<T>.combineLatest(vararg others: Flow<T>, arrayFactory: () -> Array<T?>, transform: suspend (Array<T>) -> R): Flow<R> = flow {
-    coroutineScope {
-        val size = others.size + 1
-        val channels =
-            Array(size) { if (it == 0) asFairChannel(this@combineLatest) else asFairChannel(others[it - 1]) }
-        val latestValues = arrayOfNulls<Any?>(size)
-        val isClosed = Array(size) { false }
-
-        // See flow.combineLatest(other) for explanation.
-        while (!isClosed.all { it }) {
-            select<Unit> {
-                for (i in 0 until size) {
-                    onReceive(isClosed[i], channels[i], { isClosed[i] = true }) { value ->
-                        latestValues[i] = value
-                        if (latestValues.all { it !== null }) {
-                            val arguments = arrayFactory()
-                            for (index in 0 until size) {
-                                arguments[index] = NULL.unbox(latestValues[index])
-                            }
-                            emit(transform(arguments as Array<T>))
-                        }
-                    }
-                }
-            }
-        }
-    }
+@ExperimentalCoroutinesApi
+public inline fun <reified T, R> combine(
+    flows: Iterable<Flow<T>>,
+    crossinline transform: suspend (Array<T>) -> R
+): Flow<R> {
+    val flowArray = flows.toList().toTypedArray()
+    return combine(*flowArray, arrayFactory = { arrayOfNulls(flowArray.size) }, transform = { emit(transform(it)) })
 }
 
-private inline fun SelectBuilder<Unit>.onReceive(
-    isClosed: Boolean,
-    channel: ReceiveChannel<Any>,
-    crossinline onClosed: () -> Unit,
-    noinline onReceive: suspend (value: Any) -> Unit
-) {
-    if (isClosed) return
-    channel.onReceiveOrNull {
-        if (it === null) onClosed()
-        else onReceive(it)
-    }
+/**
+ * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
+ *
+ * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
+ * generic function that may transform emitted element, skip it or emit it multiple times.
+ */
+@ExperimentalCoroutinesApi
+public inline fun <reified T, R> combineTransform(
+    flows: Iterable<Flow<T>>,
+    @BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
+): Flow<R> {
+    val flowArray = flows.toList().toTypedArray()
+    return combine(*flowArray, arrayFactory = { arrayOfNulls(flowArray.size) }, transform = { transform(it) })
 }
 
-// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
-private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
-    val channel = channel as ChannelCoroutine<Any>
-    flow.collect { value ->
-        channel.sendFair(value ?: NULL)
-    }
-}
-
+/**
+ * Zips values from the current flow (`this`) with [other] flow using provided [transform] function applied to each pair of values.
+ * The resulting flow completes as soon as one of the flows completes and cancel is called on the remaining flow.
+ *
+ * It can be demonstrated with the following example:
+ * ```
+ * val flow = flowOf(1, 2, 3).delayEach(10)
+ * val flow2 = flowOf("a", "b", "c", "d").delayEach(15)
+ * flow.zip(flow2) { i, s -> i.toString() + s }.collect {
+ *     println(it) // Will print "1a 2b 3c"
+ * }
+ * ```
+ */
+@JvmName("flowZip")
+@ExperimentalCoroutinesApi
+public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = zipImpl(this, other, transform)
 
 /**
  * Zips values from the current flow (`this`) with [other] flow using provided [transform] function applied to each pair of values.
@@ -210,45 +315,4 @@
  * ```
  */
 @ExperimentalCoroutinesApi
-public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = flow {
-    coroutineScope {
-        val first = asChannel(this@zip)
-        val second = asChannel(other)
-        /*
-         * This approach only works with rendezvous channel and is required to enforce correctness
-         * in the following scenario:
-         * ```
-         * val f1 = flow { emit(1); delay(Long.MAX_VALUE) }
-         * val f2 = flowOf(1)
-         * f1.zip(f2) { ... }
-         * ```
-         *
-         * Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
-         */
-        (second as SendChannel<*>).invokeOnClose {
-            if (!first.isClosedForReceive) first.cancel(AbortFlowException())
-        }
-
-        val otherIterator = second.iterator()
-        try {
-            first.consumeEach { value ->
-                if (!otherIterator.hasNext()) {
-                    return@consumeEach
-                }
-                val secondValue = NULL.unbox<T2>(otherIterator.next())
-                emit(transform(NULL.unbox(value), NULL.unbox(secondValue)))
-            }
-        } catch (e: AbortFlowException) {
-            // complete
-        } finally {
-            if (!second.isClosedForReceive) second.cancel(AbortFlowException())
-        }
-    }
-}
-
-// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
-private fun CoroutineScope.asChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
-    flow.collect { value ->
-        channel.send(value ?: NULL)
-    }
-}
+public fun <T1, T2, R> zip(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = zipImpl(flow, flow2, transform)
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestVarargTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestVarargTest.kt
deleted file mode 100644
index 37726fa..0000000
--- a/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestVarargTest.kt
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.flow.operators
-
-import kotlinx.coroutines.*
-import kotlinx.coroutines.flow.*
-import kotlin.test.*
-
-class CombineLatestVarargTest : TestBase() {
-
-    @Test
-    fun testThreeParameters() = runTest {
-        val flow = flowOf("1").combineLatest(flowOf(2), flowOf(null)) { a, b, c ->
-            a + b + c
-        }
-
-        assertEquals("12null", flow.single())
-    }
-
-    @Test
-    fun testFourParameters() = runTest {
-        val flow = flowOf("1").combineLatest(flowOf(2), flowOf("3"), flowOf(null)) { a, b, c, d ->
-            a + b + c + d
-        }
-
-        assertEquals("123null", flow.single())
-    }
-
-    @Test
-    fun testFiveParameters() = runTest {
-        val flow =
-            flowOf("1").combineLatest(flowOf(2), flowOf("3"), flowOf(4.toByte()), flowOf(null)) { a, b, c, d, e ->
-                a + b + c + d + e
-            }
-
-        assertEquals("1234null", flow.single())
-    }
-
-    @Test
-    fun testVararg() = runTest {
-        val flow = flowOf("1").combineLatest(
-            flowOf(2),
-            flowOf("3"),
-            flowOf(4.toByte()),
-            flowOf("5"),
-            flowOf(null)
-        ) { arr -> arr.joinToString("") }
-        assertEquals("12345null", flow.single())
-    }
-
-    @Test
-    fun testEmptyVararg() = runTest {
-        val list = flowOf(1, 2, 3).combineLatest { args: Array<Any?> -> args[0] }.toList()
-        assertEquals(listOf(1, 2, 3), list)
-    }
-
-    @Test
-    fun testNonNullableAny() = runTest {
-        val value = flowOf(1).combineLatest(flowOf(2)) { args: Array<Int> ->
-            @Suppress("USELESS_IS_CHECK")
-            assertTrue(args is Array<Int>)
-            args[0] + args[1]
-        }.single()
-        assertEquals(3, value)
-    }
-}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CombineParametersTestBase.kt b/kotlinx-coroutines-core/common/test/flow/operators/CombineParametersTestBase.kt
new file mode 100644
index 0000000..a987c83
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/operators/CombineParametersTestBase.kt
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow.operators
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import kotlin.test.*
+
+class CombineParametersTest : TestBase() {
+
+    @Test
+    fun testThreeParameters() = runTest {
+        val flow = combine(flowOf("1"), flowOf(2), flowOf(null)) { a, b, c -> a + b + c }
+        assertEquals("12null", flow.single())
+
+        val flow2 = combineTransform(flowOf("1"), flowOf(2), flowOf(null)) { a, b, c -> emit(a + b + c) }
+        assertEquals("12null", flow2.single())
+    }
+
+    @Test
+    fun testThreeParametersTransform() = runTest {
+        val flow = combineTransform(flowOf("1"), flowOf(2), flowOf(null)) { a, b, c -> emit(a + b + c) }
+        assertEquals("12null", flow.single())
+    }
+
+    @Test
+    fun testFourParameters() = runTest {
+        val flow = combine(flowOf("1"), flowOf(2), flowOf("3"), flowOf(null)) { a, b, c, d -> a + b + c + d }
+        assertEquals("123null", flow.single())
+    }
+
+    @Test
+    fun testFourParametersTransform() = runTest {
+        val flow = combineTransform(flowOf("1"), flowOf(2), flowOf("3"), flowOf(null)) { a, b, c, d ->
+            emit(a + b + c + d)
+        }
+        assertEquals("123null", flow.single())
+    }
+
+    @Test
+    fun testFiveParameters() = runTest {
+        val flow = combine(flowOf("1"), flowOf(2), flowOf("3"), flowOf(4.toByte()), flowOf(null)) { a, b, c, d, e ->
+                a + b + c + d + e
+            }
+        assertEquals("1234null", flow.single())
+    }
+
+    @Test
+    fun testFiveParametersTransform() = runTest {
+        val flow =
+            combineTransform(flowOf("1"), flowOf(2), flowOf("3"), flowOf(4.toByte()), flowOf(null)) { a, b, c, d, e ->
+                emit(a + b + c + d + e)
+            }
+        assertEquals("1234null", flow.single())
+    }
+
+    @Test
+    fun testNonMatchingTypes() = runTest {
+        val flow = combine(flowOf(1), flowOf("2")) { args: Array<Any?> ->
+            args[0]?.toString() + args[1]?.toString()
+        }
+        assertEquals("12", flow.single())
+    }
+
+    @Test
+    fun testNonMatchingTypesIterable() = runTest {
+        val flow = combine(listOf(flowOf(1), flowOf("2"))) { args: Array<Any?> ->
+            args[0]?.toString() + args[1]?.toString()
+        }
+        assertEquals("12", flow.single())
+    }
+
+    @Test
+    fun testVararg() = runTest {
+        val flow = combine(
+            flowOf("1"),
+            flowOf(2),
+            flowOf("3"),
+            flowOf(4.toByte()),
+            flowOf("5"),
+            flowOf(null)
+        ) { arr -> arr.joinToString("") }
+        assertEquals("12345null", flow.single())
+    }
+
+    @Test
+    fun testVarargTransform() = runTest {
+        val flow = combineTransform(
+            flowOf("1"),
+            flowOf(2),
+            flowOf("3"),
+            flowOf(4.toByte()),
+            flowOf("5"),
+            flowOf(null)
+        ) { arr -> emit(arr.joinToString("")) }
+        assertEquals("12345null", flow.single())
+    }
+
+    @Test
+    fun testEmptyVararg() = runTest {
+        val list = combine(flowOf(1, 2, 3)) { args: Array<Any?> -> args[0] }.toList()
+        assertEquals(listOf(1, 2, 3), list)
+    }
+
+    @Test
+    fun testEmptyVarargTransform() = runTest {
+        val list = combineTransform(flowOf(1, 2, 3)) { args: Array<Any?> -> emit(args[0]) }.toList()
+        assertEquals(listOf(1, 2, 3), list)
+    }
+
+    @Test
+    fun testReified() = runTest {
+        val value = combine(flowOf(1), flowOf(2)) { args: Array<Int> ->
+            @Suppress("USELESS_IS_CHECK")
+            assertTrue(args is Array<Int>)
+            args[0] + args[1]
+        }.single()
+        assertEquals(3, value)
+    }
+
+    @Test
+    fun testReifiedTransform() = runTest {
+        val value = combineTransform(flowOf(1), flowOf(2)) { args: Array<Int> ->
+            @Suppress("USELESS_IS_CHECK")
+            assertTrue(args is Array<Int>)
+            emit(args[0] + args[1])
+        }.single()
+        assertEquals(3, value)
+    }
+
+    @Test
+    fun testEmpty() = runTest {
+        val value = combineTransform { args: Array<Int> ->
+            emit(args[0] + args[1])
+        }.singleOrNull()
+        assertNull(value)
+    }
+
+    @Test
+    fun testEmptyIterable() = runTest {
+        val value = combineTransform(emptyList()) { args: Array<Int> ->
+            emit(args[0] + args[1])
+        }.singleOrNull()
+        assertNull(value)
+    }
+
+    @Test
+    fun testEmptyReified() = runTest {
+        val value = combineTransform { args: Array<Int> ->
+            emit(args[0] + args[1])
+        }.singleOrNull()
+        assertNull(value)
+    }
+
+    @Test
+    fun testEmptyIterableReified() = runTest {
+        val value = combineTransform(emptyList()) { args: Array<Int> ->
+            emit(args[0] + args[1])
+        }.singleOrNull()
+        assertNull(value)
+    }
+}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt
similarity index 80%
rename from kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt
rename to kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt
index 54244f0..637cb3d 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt
@@ -6,12 +6,13 @@
 
 import kotlinx.coroutines.*
 import kotlin.test.*
-import kotlinx.coroutines.flow.combineLatest as combineLatestOriginal
+import kotlinx.coroutines.flow.combine as combineOriginal
+import kotlinx.coroutines.flow.combineTransform as combineTransformOriginal
 
 /*
  * Replace:  { i, j -> i + j } ->  { i, j -> i + j } as soon as KT-30991 is fixed
  */
-abstract class CombineLatestTestBase : TestBase() {
+abstract class CombineTestBase : TestBase() {
 
     abstract fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>
 
@@ -239,11 +240,33 @@
     }
 }
 
-class CombineLatestTest : CombineLatestTestBase() {
-    override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = combineLatestOriginal(other, transform)
+class CombineTest : CombineTestBase() {
+    override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = combineOriginal(other, transform)
 }
 
-class CombineLatestVarargAdapterTest : CombineLatestTestBase() {
+class CombineTransformTest : CombineTestBase() {
+    override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = combineTransformOriginal(other) { a, b ->
+        emit(transform(a, b))
+    }
+}
+
+class CombineVarargAdapterTest : CombineTestBase() {
     override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
-        (this as Flow<*>).combineLatestOriginal(other) { args: Array<Any?> -> transform(args[0] as T1, args[1] as T2) }
-}
\ No newline at end of file
+        combineOriginal(this, other) { args: Array<Any?> -> transform(args[0] as T1, args[1] as T2) }
+}
+
+class CombineIterableTest : CombineTestBase() {
+    override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
+        combineOriginal(listOf(this, other)) { args -> transform(args[0] as T1, args[1] as T2) }
+}
+
+class CombineTransformVarargAdapterTest : CombineTestBase() {
+    override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
+        combineTransformOriginal(this, other) { args: Array<Any?> -> emit(transform(args[0] as T1, args[1] as T2)) }
+}
+
+class CombineTransformIterableTest : CombineTestBase() {
+    override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
+        combineTransformOriginal(listOf(this, other)) { args -> emit(transform(args[0] as T1, args[1] as T2)) }
+}
+