combineLatest rework

  * Operator renamed to combine
  * Introduced combineTransform operator with custom transformer
  * Decouple API and implementation details to improve user experience from IDE
  * combine(Iterable<Flow>) and combineTransform(Iterable<Flow>) are introduced

Fixes #1224
Fixes #1262
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
new file mode 100644
index 0000000..28b319d
--- /dev/null
+++ b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+@file:Suppress("UNCHECKED_CAST", "NON_APPLICABLE_CALL_FOR_BUILDER_INFERENCE") // KT-32203
+
+package kotlinx.coroutines.flow.internal
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.internal.Symbol
+import kotlinx.coroutines.selects.*
+
+internal fun getNull(): Symbol = NULL // Workaround for JS BE bug
+
+internal suspend inline fun <T1, T2, R> FlowCollector<R>.combineTransformInternal(
+    first: Flow<T1>, second: Flow<T2>,
+    crossinline transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
+) {
+    coroutineScope {
+        val firstChannel = asFairChannel(first)
+        val secondChannel = asFairChannel(second)
+        var firstValue: Any? = null
+        var secondValue: Any? = null
+        var firstIsClosed = false
+        var secondIsClosed = false
+        while (!firstIsClosed || !secondIsClosed) {
+            select<Unit> {
+                onReceive(firstIsClosed, firstChannel, { firstIsClosed = true }) { value ->
+                    firstValue = value
+                    if (secondValue !== null) {
+                        transform(getNull().unbox(firstValue), getNull().unbox(secondValue) as T2)
+                    }
+                }
+
+                onReceive(secondIsClosed, secondChannel, { secondIsClosed = true }) { value ->
+                    secondValue = value
+                    if (firstValue !== null) {
+                        transform(getNull().unbox(firstValue) as T1, getNull().unbox(secondValue) as T2)
+                    }
+                }
+            }
+        }
+    }
+}
+
+@PublishedApi
+internal fun <T, R> combine(
+    vararg flows: Flow<T>,
+    arrayFactory: () -> Array<T?>,
+    transform: suspend FlowCollector<R>.(Array<T>) -> Unit
+): Flow<R> = flow {
+    coroutineScope {
+        val size = flows.size
+        val channels =
+            Array(size) { asFairChannel(flows[it]) }
+        val latestValues = arrayOfNulls<Any?>(size)
+        val isClosed = Array(size) { false }
+
+        // See flow.combine(other) for explanation.
+        while (!isClosed.all { it }) {
+            select<Unit> {
+                for (i in 0 until size) {
+                    onReceive(isClosed[i], channels[i], { isClosed[i] = true }) { value ->
+                        latestValues[i] = value
+                        if (latestValues.all { it !== null }) {
+                            val arguments = arrayFactory()
+                            for (index in 0 until size) {
+                                arguments[index] = NULL.unbox(latestValues[index])
+                            }
+                            transform(arguments as Array<T>)
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
+
+private inline fun SelectBuilder<Unit>.onReceive(
+    isClosed: Boolean,
+    channel: ReceiveChannel<Any>,
+    crossinline onClosed: () -> Unit,
+    noinline onReceive: suspend (value: Any) -> Unit
+) {
+    if (isClosed) return
+    channel.onReceiveOrNull {
+        // TODO onReceiveOrClosed when boxing issues are fixed
+        if (it === null) onClosed()
+        else onReceive(it)
+    }
+}
+
+// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
+private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
+    val channel = channel as ChannelCoroutine<Any>
+    flow.collect { value ->
+        return@collect channel.sendFair(value ?: NULL)
+    }
+}
+
+internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = unsafeFlow {
+    coroutineScope {
+        val first = asChannel(flow)
+        val second = asChannel(flow2)
+        /*
+         * This approach only works with rendezvous channel and is required to enforce correctness
+         * in the following scenario:
+         * ```
+         * val f1 = flow { emit(1); delay(Long.MAX_VALUE) }
+         * val f2 = flowOf(1)
+         * f1.zip(f2) { ... }
+         * ```
+         *
+         * Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
+         */
+        (second as SendChannel<*>).invokeOnClose {
+            if (!first.isClosedForReceive) first.cancel(AbortFlowException())
+        }
+
+        val otherIterator = second.iterator()
+        try {
+            first.consumeEach { value ->
+                if (!otherIterator.hasNext()) {
+                    return@consumeEach
+                }
+                emit(transform(NULL.unbox(value), NULL.unbox(otherIterator.next())))
+            }
+        } catch (e: AbortFlowException) {
+            // complete
+        } finally {
+            if (!second.isClosedForReceive) second.cancel(AbortFlowException())
+        }
+    }
+}
+
+// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
+private fun CoroutineScope.asChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
+    flow.collect { value ->
+        return@collect channel.send(value ?: NULL)
+    }
+}