combineLatest rework
* Operator renamed to combine
* Introduced combineTransform operator with custom transformer
* Decouple API and implementation details to improve user experience from IDE
* combine(Iterable<Flow>) and combineTransform(Iterable<Flow>) are introduced
Fixes #1224
Fixes #1262
diff --git a/kotlinx-coroutines-core/common/src/flow/Migration.kt b/kotlinx-coroutines-core/common/src/flow/Migration.kt
index 0b48b97..a579c55 100644
--- a/kotlinx-coroutines-core/common/src/flow/Migration.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Migration.kt
@@ -364,6 +364,50 @@
)
public fun <T> Flow<T>.concatWith(other: Flow<T>): Flow<T> = noImpl()
+@Deprecated(
+ level = DeprecationLevel.ERROR,
+ message = "Flow analogue of 'combineLatest' is 'combine'",
+ replaceWith = ReplaceWith("this.combine(other, transform)")
+)
+public fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
+ combine(this, other, transform)
+
+@Deprecated(
+ level = DeprecationLevel.ERROR,
+ message = "Flow analogue of 'combineLatest' is 'combine'",
+ replaceWith = ReplaceWith("combine(this, other, other2, transform)")
+)
+public inline fun <T1, T2, T3, R> Flow<T1>.combineLatest(
+ other: Flow<T2>,
+ other2: Flow<T3>,
+ crossinline transform: suspend (T1, T2, T3) -> R
+) = combine(this, other, other2, transform)
+
+@Deprecated(
+ level = DeprecationLevel.ERROR,
+ message = "Flow analogue of 'combineLatest' is 'combine'",
+ replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)")
+)
+public inline fun <T1, T2, T3, T4, R> Flow<T1>.combineLatest(
+ other: Flow<T2>,
+ other2: Flow<T3>,
+ other3: Flow<T4>,
+ crossinline transform: suspend (T1, T2, T3, T4) -> R
+) = combine(this, other, other2, other3, transform)
+
+@Deprecated(
+ level = DeprecationLevel.ERROR,
+ message = "Flow analogue of 'combineLatest' is 'combine'",
+ replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)")
+)
+public inline fun <T1, T2, T3, T4, T5, R> Flow<T1>.combineLatest(
+ other: Flow<T2>,
+ other2: Flow<T3>,
+ other3: Flow<T4>,
+ other4: Flow<T5>,
+ crossinline transform: suspend (T1, T2, T3, T4, T5) -> R
+): Flow<R> = combine(this, other, other2, other3, other4, transform)
+
/**
* Delays the emission of values from this flow for the given [timeMillis].
* Use `onStart { delay(timeMillis) }`.
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
new file mode 100644
index 0000000..28b319d
--- /dev/null
+++ b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+@file:Suppress("UNCHECKED_CAST", "NON_APPLICABLE_CALL_FOR_BUILDER_INFERENCE") // KT-32203
+
+package kotlinx.coroutines.flow.internal
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.internal.Symbol
+import kotlinx.coroutines.selects.*
+
+internal fun getNull(): Symbol = NULL // Workaround for JS BE bug
+
+internal suspend inline fun <T1, T2, R> FlowCollector<R>.combineTransformInternal(
+ first: Flow<T1>, second: Flow<T2>,
+ crossinline transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
+) {
+ coroutineScope {
+ val firstChannel = asFairChannel(first)
+ val secondChannel = asFairChannel(second)
+ var firstValue: Any? = null
+ var secondValue: Any? = null
+ var firstIsClosed = false
+ var secondIsClosed = false
+ while (!firstIsClosed || !secondIsClosed) {
+ select<Unit> {
+ onReceive(firstIsClosed, firstChannel, { firstIsClosed = true }) { value ->
+ firstValue = value
+ if (secondValue !== null) {
+ transform(getNull().unbox(firstValue), getNull().unbox(secondValue) as T2)
+ }
+ }
+
+ onReceive(secondIsClosed, secondChannel, { secondIsClosed = true }) { value ->
+ secondValue = value
+ if (firstValue !== null) {
+ transform(getNull().unbox(firstValue) as T1, getNull().unbox(secondValue) as T2)
+ }
+ }
+ }
+ }
+ }
+}
+
+@PublishedApi
+internal fun <T, R> combine(
+ vararg flows: Flow<T>,
+ arrayFactory: () -> Array<T?>,
+ transform: suspend FlowCollector<R>.(Array<T>) -> Unit
+): Flow<R> = flow {
+ coroutineScope {
+ val size = flows.size
+ val channels =
+ Array(size) { asFairChannel(flows[it]) }
+ val latestValues = arrayOfNulls<Any?>(size)
+ val isClosed = Array(size) { false }
+
+ // See flow.combine(other) for explanation.
+ while (!isClosed.all { it }) {
+ select<Unit> {
+ for (i in 0 until size) {
+ onReceive(isClosed[i], channels[i], { isClosed[i] = true }) { value ->
+ latestValues[i] = value
+ if (latestValues.all { it !== null }) {
+ val arguments = arrayFactory()
+ for (index in 0 until size) {
+ arguments[index] = NULL.unbox(latestValues[index])
+ }
+ transform(arguments as Array<T>)
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+private inline fun SelectBuilder<Unit>.onReceive(
+ isClosed: Boolean,
+ channel: ReceiveChannel<Any>,
+ crossinline onClosed: () -> Unit,
+ noinline onReceive: suspend (value: Any) -> Unit
+) {
+ if (isClosed) return
+ channel.onReceiveOrNull {
+ // TODO onReceiveOrClosed when boxing issues are fixed
+ if (it === null) onClosed()
+ else onReceive(it)
+ }
+}
+
+// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
+private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
+ val channel = channel as ChannelCoroutine<Any>
+ flow.collect { value ->
+ return@collect channel.sendFair(value ?: NULL)
+ }
+}
+
+internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = unsafeFlow {
+ coroutineScope {
+ val first = asChannel(flow)
+ val second = asChannel(flow2)
+ /*
+ * This approach only works with rendezvous channel and is required to enforce correctness
+ * in the following scenario:
+ * ```
+ * val f1 = flow { emit(1); delay(Long.MAX_VALUE) }
+ * val f2 = flowOf(1)
+ * f1.zip(f2) { ... }
+ * ```
+ *
+ * Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
+ */
+ (second as SendChannel<*>).invokeOnClose {
+ if (!first.isClosedForReceive) first.cancel(AbortFlowException())
+ }
+
+ val otherIterator = second.iterator()
+ try {
+ first.consumeEach { value ->
+ if (!otherIterator.hasNext()) {
+ return@consumeEach
+ }
+ emit(transform(NULL.unbox(value), NULL.unbox(otherIterator.next())))
+ }
+ } catch (e: AbortFlowException) {
+ // complete
+ } finally {
+ if (!second.isClosedForReceive) second.cancel(AbortFlowException())
+ }
+ }
+}
+
+// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
+private fun CoroutineScope.asChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
+ flow.collect { value ->
+ return@collect channel.send(value ?: NULL)
+ }
+}
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
index f3a1126..62d5e4c 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
@@ -24,7 +24,6 @@
* generic function that may transform emitted element, skip it or emit it multiple times.
*
* This operator can be used as a building block for other operators, for example:
- *
* ```
* fun Flow<Int>.skipOddAndDuplicateEven(): Flow<Int> = transform { value ->
* if (value % 2 == 0) { // Emit only even values, but twice
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt
index 72822bb..a3f5830 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt
@@ -4,15 +4,14 @@
@file:JvmMultifileClass
@file:JvmName("FlowKt")
-@file:Suppress("UNCHECKED_CAST")
+@file:Suppress("UNCHECKED_CAST", "NON_APPLICABLE_CALL_FOR_BUILDER_INFERENCE") // KT-32203
package kotlinx.coroutines.flow
import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.internal.*
-import kotlinx.coroutines.selects.*
import kotlin.jvm.*
+import kotlinx.coroutines.flow.flow as safeFlow
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
/**
@@ -23,69 +22,123 @@
* ```
* val flow = flowOf(1, 2).delayEach(10)
* val flow2 = flowOf("a", "b", "c").delayEach(15)
- * flow.combineLatest(flow2) { i, s -> i.toString() + s }.collect {
+ * flow.combine(flow2) { i, s -> i.toString() + s }.collect {
* println(it) // Will print "1a 2a 2b 2c"
* }
* ```
+ *
+ * This function is a shorthand for `flow.combineTransform(flow2) { a, b -> emit(transform(a, b)) }
*/
-@FlowPreview
-public fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = flow {
- coroutineScope {
- val firstChannel = asFairChannel(this@combineLatest)
- val secondChannel = asFairChannel(other)
- var firstValue: Any? = null
- var secondValue: Any? = null
- var firstIsClosed = false
- var secondIsClosed = false
-
- /*
- * Fun fact, this select **semantically** equivalent of the following:
- * ```
- * selectWhile<Unit> {
- * channel.onReceive {
- * emitCombined(...)
- * }
- * channel2.onReceive {
- * emitCombined(...)
- * }
- * }
- * ```
- * but we are waiting for `channels` branch to get merged where we will change semantics of the select
- * to ignore finished clauses.
- *
- * Instead (especially in the face of non-fair channels) we are using our own hand-rolled select emulation
- * on top of previous select.
- */
- while (!firstIsClosed || !secondIsClosed) {
- select<Unit> {
- onReceive(firstIsClosed, firstChannel, { firstIsClosed = true }) { value ->
- firstValue = value
- if (secondValue !== null) {
- emit(transform(NULL.unbox(firstValue), NULL.unbox(secondValue)))
- }
- }
-
- onReceive(secondIsClosed, secondChannel, { secondIsClosed = true }) { value ->
- secondValue = value
- if (firstValue !== null) {
- emit(transform(NULL.unbox(firstValue), NULL.unbox(secondValue)))
- }
- }
- }
- }
+@JvmName("flowCombine")
+@ExperimentalCoroutinesApi
+public fun <T1, T2, R> Flow<T1>.combine(flow: Flow<T2>, transform: suspend (a: T1, b: T2) -> R): Flow<R> = flow {
+ combineTransformInternal(this@combine, flow) { a, b ->
+ emit(transform(a, b))
}
}
/**
* Returns a [Flow] whose values are generated with [transform] function by combining
* the most recently emitted values by each flow.
+ *
+ * It can be demonstrated with the following example:
+ * ```
+ * val flow = flowOf(1, 2).delayEach(10)
+ * val flow2 = flowOf("a", "b", "c").delayEach(15)
+ * combine(flow, flow2) { i, s -> i.toString() + s }.collect {
+ * println(it) // Will print "1a 2a 2b 2c"
+ * }
+ * ```
+ *
+ * This function is a shorthand for `combineTransform(flow, flow2) { a, b -> emit(transform(a, b)) }
*/
-@FlowPreview
-public inline fun <T1, T2, T3, R> Flow<T1>.combineLatest(
- other: Flow<T2>,
- other2: Flow<T3>,
- crossinline transform: suspend (T1, T2, T3) -> R
-): Flow<R> = (this as Flow<*>).combineLatest(other, other2) { args: Array<*> ->
+@ExperimentalCoroutinesApi
+public fun <T1, T2, R> combine(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (a: T1, b: T2) -> R): Flow<R> =
+ flow.combine(flow2, transform)
+
+/**
+ * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
+ *
+ * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
+ * generic function that may transform emitted element, skip it or emit it multiple times.
+ *
+ * Its usage can be demonstrated with the following example:
+ * ```
+ * val flow = requestFlow()
+ * val flow2 = searchEngineFlow()
+ * flow.combineTransform(flow2) { request, searchEngine ->
+ * emit("Downloading in progress")
+ * val result = download(request, searchEngine)
+ * emit(result)
+ * }
+ * ```
+ */
+@JvmName("flowCombineTransform")
+@ExperimentalCoroutinesApi
+public fun <T1, T2, R> Flow<T1>.combineTransform(
+ flow: Flow<T2>,
+ @BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
+): Flow<R> = safeFlow {
+ combineTransformInternal(this@combineTransform, flow) { a, b ->
+ transform(a, b)
+ }
+}
+
+/**
+ * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
+ *
+ * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
+ * generic function that may transform emitted element, skip it or emit it multiple times.
+ *
+ * Its usage can be demonstrated with the following example:
+ * ```
+ * val flow = requestFlow()
+ * val flow2 = searchEngineFlow()
+ * combineTransform(flow, flow2) { request, searchEngine ->
+ * emit("Downloading in progress")
+ * val result = download(request, searchEngine)
+ * emit(result)
+ * }
+ * ```
+ */
+@ExperimentalCoroutinesApi
+public fun <T1, T2, R> combineTransform(
+ flow: Flow<T1>,
+ flow2: Flow<T2>,
+ @BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
+): Flow<R> = combineTransform(flow, flow2, transform)
+
+/**
+ * Returns a [Flow] whose values are generated with [transform] function by combining
+ * the most recently emitted values by each flow.
+ */
+@ExperimentalCoroutinesApi
+public inline fun <T1, T2, T3, R> combine(
+ flow: Flow<T1>,
+ flow2: Flow<T2>,
+ flow3: Flow<T3>,
+ @BuilderInference crossinline transform: suspend (T1, T2, T3) -> R
+): Flow<R> = combine(flow, flow2, flow3) { args: Array<*> ->
+ transform(
+ args[0] as T1,
+ args[1] as T2,
+ args[2] as T3
+ )
+}
+
+/**
+ * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
+ *
+ * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
+ * generic function that may transform emitted element, skip it or emit it multiple times.
+ */
+@ExperimentalCoroutinesApi
+public inline fun <T1, T2, T3, R> combineTransform(
+ flow: Flow<T1>,
+ flow2: Flow<T2>,
+ flow3: Flow<T3>,
+ @BuilderInference crossinline transform: suspend FlowCollector<R>.(T1, T2, T3) -> Unit
+): Flow<R> = combineTransform(flow, flow2, flow3) { args: Array<*> ->
transform(
args[0] as T1,
args[1] as T2,
@@ -97,13 +150,36 @@
* Returns a [Flow] whose values are generated with [transform] function by combining
* the most recently emitted values by each flow.
*/
-@FlowPreview
-public inline fun <T1, T2, T3, T4, R> Flow<T1>.combineLatest(
- other: Flow<T2>,
- other2: Flow<T3>,
- other3: Flow<T4>,
+@ExperimentalCoroutinesApi
+public inline fun <T1, T2, T3, T4, R> combine(
+ flow: Flow<T1>,
+ flow2: Flow<T2>,
+ flow3: Flow<T3>,
+ flow4: Flow<T4>,
crossinline transform: suspend (T1, T2, T3, T4) -> R
-): Flow<R> = (this as Flow<*>).combineLatest(other, other2, other3) { args: Array<*> ->
+): Flow<R> = combine(flow, flow2, flow3, flow4) { args: Array<*> ->
+ transform(
+ args[0] as T1,
+ args[1] as T2,
+ args[2] as T3,
+ args[3] as T4
+ )
+}
+
+/**
+ * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
+ *
+ * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
+ * generic function that may transform emitted element, skip it or emit it multiple times.
+ */
+@ExperimentalCoroutinesApi
+public inline fun <T1, T2, T3, T4, R> combineTransform(
+ flow: Flow<T1>,
+ flow2: Flow<T2>,
+ flow3: Flow<T3>,
+ flow4: Flow<T4>,
+ @BuilderInference crossinline transform: suspend FlowCollector<R>.(T1, T2, T3, T4) -> Unit
+): Flow<R> = combineTransform(flow, flow2, flow3, flow4) { args: Array<*> ->
transform(
args[0] as T1,
args[1] as T2,
@@ -116,14 +192,39 @@
* Returns a [Flow] whose values are generated with [transform] function by combining
* the most recently emitted values by each flow.
*/
-@FlowPreview
-public inline fun <T1, T2, T3, T4, T5, R> Flow<T1>.combineLatest(
- other: Flow<T2>,
- other2: Flow<T3>,
- other3: Flow<T4>,
- other4: Flow<T5>,
+@ExperimentalCoroutinesApi
+public inline fun <T1, T2, T3, T4, T5, R> combine(
+ flow: Flow<T1>,
+ flow2: Flow<T2>,
+ flow3: Flow<T3>,
+ flow4: Flow<T4>,
+ flow5: Flow<T5>,
crossinline transform: suspend (T1, T2, T3, T4, T5) -> R
-): Flow<R> = (this as Flow<*>).combineLatest(other, other2, other3, other4) { args: Array<*> ->
+): Flow<R> = combine(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
+ transform(
+ args[0] as T1,
+ args[1] as T2,
+ args[2] as T3,
+ args[3] as T4,
+ args[4] as T5
+ )
+}
+
+/**
+ * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
+ *
+ * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
+ * generic function that may transform emitted element, skip it or emit it multiple times.
+ */
+@ExperimentalCoroutinesApi
+public inline fun <T1, T2, T3, T4, T5, R> combineTransform(
+ flow: Flow<T1>,
+ flow2: Flow<T2>,
+ flow3: Flow<T3>,
+ flow4: Flow<T4>,
+ flow5: Flow<T5>,
+ @BuilderInference crossinline transform: suspend FlowCollector<R>.(T1, T2, T3, T4, T5) -> Unit
+): Flow<R> = combineTransform(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
transform(
args[0] as T1,
args[1] as T2,
@@ -137,64 +238,68 @@
* Returns a [Flow] whose values are generated with [transform] function by combining
* the most recently emitted values by each flow.
*/
-@FlowPreview
-public inline fun <reified T, R> Flow<T>.combineLatest(vararg others: Flow<T>, crossinline transform: suspend (Array<T>) -> R): Flow<R> =
- combineLatest(*others, arrayFactory = { arrayOfNulls(others.size + 1) }, transform = { transform(it) })
+@ExperimentalCoroutinesApi
+public inline fun <reified T, R> combine(
+ vararg flows: Flow<T>,
+ crossinline transform: suspend (Array<T>) -> R
+): Flow<R> = combine(*flows, arrayFactory = { arrayOfNulls(flows.size) }, transform = { emit(transform(it)) })
+
+/**
+ * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
+ *
+ * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
+ * generic function that may transform emitted element, skip it or emit it multiple times.
+ */
+@ExperimentalCoroutinesApi
+public inline fun <reified T, R> combineTransform(
+ vararg flows: Flow<T>,
+ @BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
+): Flow<R> = combine(*flows, arrayFactory = { arrayOfNulls(flows.size) }, transform = { transform(it) })
/**
* Returns a [Flow] whose values are generated with [transform] function by combining
* the most recently emitted values by each flow.
*/
-@PublishedApi
-internal fun <T, R> Flow<T>.combineLatest(vararg others: Flow<T>, arrayFactory: () -> Array<T?>, transform: suspend (Array<T>) -> R): Flow<R> = flow {
- coroutineScope {
- val size = others.size + 1
- val channels =
- Array(size) { if (it == 0) asFairChannel(this@combineLatest) else asFairChannel(others[it - 1]) }
- val latestValues = arrayOfNulls<Any?>(size)
- val isClosed = Array(size) { false }
-
- // See flow.combineLatest(other) for explanation.
- while (!isClosed.all { it }) {
- select<Unit> {
- for (i in 0 until size) {
- onReceive(isClosed[i], channels[i], { isClosed[i] = true }) { value ->
- latestValues[i] = value
- if (latestValues.all { it !== null }) {
- val arguments = arrayFactory()
- for (index in 0 until size) {
- arguments[index] = NULL.unbox(latestValues[index])
- }
- emit(transform(arguments as Array<T>))
- }
- }
- }
- }
- }
- }
+@ExperimentalCoroutinesApi
+public inline fun <reified T, R> combine(
+ flows: Iterable<Flow<T>>,
+ crossinline transform: suspend (Array<T>) -> R
+): Flow<R> {
+ val flowArray = flows.toList().toTypedArray()
+ return combine(*flowArray, arrayFactory = { arrayOfNulls(flowArray.size) }, transform = { emit(transform(it)) })
}
-private inline fun SelectBuilder<Unit>.onReceive(
- isClosed: Boolean,
- channel: ReceiveChannel<Any>,
- crossinline onClosed: () -> Unit,
- noinline onReceive: suspend (value: Any) -> Unit
-) {
- if (isClosed) return
- channel.onReceiveOrNull {
- if (it === null) onClosed()
- else onReceive(it)
- }
+/**
+ * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
+ *
+ * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
+ * generic function that may transform emitted element, skip it or emit it multiple times.
+ */
+@ExperimentalCoroutinesApi
+public inline fun <reified T, R> combineTransform(
+ flows: Iterable<Flow<T>>,
+ @BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
+): Flow<R> {
+ val flowArray = flows.toList().toTypedArray()
+ return combine(*flowArray, arrayFactory = { arrayOfNulls(flowArray.size) }, transform = { transform(it) })
}
-// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
-private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
- val channel = channel as ChannelCoroutine<Any>
- flow.collect { value ->
- channel.sendFair(value ?: NULL)
- }
-}
-
+/**
+ * Zips values from the current flow (`this`) with [other] flow using provided [transform] function applied to each pair of values.
+ * The resulting flow completes as soon as one of the flows completes and cancel is called on the remaining flow.
+ *
+ * It can be demonstrated with the following example:
+ * ```
+ * val flow = flowOf(1, 2, 3).delayEach(10)
+ * val flow2 = flowOf("a", "b", "c", "d").delayEach(15)
+ * flow.zip(flow2) { i, s -> i.toString() + s }.collect {
+ * println(it) // Will print "1a 2b 3c"
+ * }
+ * ```
+ */
+@JvmName("flowZip")
+@ExperimentalCoroutinesApi
+public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = zipImpl(this, other, transform)
/**
* Zips values from the current flow (`this`) with [other] flow using provided [transform] function applied to each pair of values.
@@ -210,45 +315,4 @@
* ```
*/
@ExperimentalCoroutinesApi
-public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = flow {
- coroutineScope {
- val first = asChannel(this@zip)
- val second = asChannel(other)
- /*
- * This approach only works with rendezvous channel and is required to enforce correctness
- * in the following scenario:
- * ```
- * val f1 = flow { emit(1); delay(Long.MAX_VALUE) }
- * val f2 = flowOf(1)
- * f1.zip(f2) { ... }
- * ```
- *
- * Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
- */
- (second as SendChannel<*>).invokeOnClose {
- if (!first.isClosedForReceive) first.cancel(AbortFlowException())
- }
-
- val otherIterator = second.iterator()
- try {
- first.consumeEach { value ->
- if (!otherIterator.hasNext()) {
- return@consumeEach
- }
- val secondValue = NULL.unbox<T2>(otherIterator.next())
- emit(transform(NULL.unbox(value), NULL.unbox(secondValue)))
- }
- } catch (e: AbortFlowException) {
- // complete
- } finally {
- if (!second.isClosedForReceive) second.cancel(AbortFlowException())
- }
- }
-}
-
-// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
-private fun CoroutineScope.asChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
- flow.collect { value ->
- channel.send(value ?: NULL)
- }
-}
+public fun <T1, T2, R> zip(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = zipImpl(flow, flow2, transform)
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestVarargTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestVarargTest.kt
deleted file mode 100644
index 37726fa..0000000
--- a/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestVarargTest.kt
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.flow.operators
-
-import kotlinx.coroutines.*
-import kotlinx.coroutines.flow.*
-import kotlin.test.*
-
-class CombineLatestVarargTest : TestBase() {
-
- @Test
- fun testThreeParameters() = runTest {
- val flow = flowOf("1").combineLatest(flowOf(2), flowOf(null)) { a, b, c ->
- a + b + c
- }
-
- assertEquals("12null", flow.single())
- }
-
- @Test
- fun testFourParameters() = runTest {
- val flow = flowOf("1").combineLatest(flowOf(2), flowOf("3"), flowOf(null)) { a, b, c, d ->
- a + b + c + d
- }
-
- assertEquals("123null", flow.single())
- }
-
- @Test
- fun testFiveParameters() = runTest {
- val flow =
- flowOf("1").combineLatest(flowOf(2), flowOf("3"), flowOf(4.toByte()), flowOf(null)) { a, b, c, d, e ->
- a + b + c + d + e
- }
-
- assertEquals("1234null", flow.single())
- }
-
- @Test
- fun testVararg() = runTest {
- val flow = flowOf("1").combineLatest(
- flowOf(2),
- flowOf("3"),
- flowOf(4.toByte()),
- flowOf("5"),
- flowOf(null)
- ) { arr -> arr.joinToString("") }
- assertEquals("12345null", flow.single())
- }
-
- @Test
- fun testEmptyVararg() = runTest {
- val list = flowOf(1, 2, 3).combineLatest { args: Array<Any?> -> args[0] }.toList()
- assertEquals(listOf(1, 2, 3), list)
- }
-
- @Test
- fun testNonNullableAny() = runTest {
- val value = flowOf(1).combineLatest(flowOf(2)) { args: Array<Int> ->
- @Suppress("USELESS_IS_CHECK")
- assertTrue(args is Array<Int>)
- args[0] + args[1]
- }.single()
- assertEquals(3, value)
- }
-}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CombineParametersTestBase.kt b/kotlinx-coroutines-core/common/test/flow/operators/CombineParametersTestBase.kt
new file mode 100644
index 0000000..a987c83
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/operators/CombineParametersTestBase.kt
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow.operators
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import kotlin.test.*
+
+class CombineParametersTest : TestBase() {
+
+ @Test
+ fun testThreeParameters() = runTest {
+ val flow = combine(flowOf("1"), flowOf(2), flowOf(null)) { a, b, c -> a + b + c }
+ assertEquals("12null", flow.single())
+
+ val flow2 = combineTransform(flowOf("1"), flowOf(2), flowOf(null)) { a, b, c -> emit(a + b + c) }
+ assertEquals("12null", flow2.single())
+ }
+
+ @Test
+ fun testThreeParametersTransform() = runTest {
+ val flow = combineTransform(flowOf("1"), flowOf(2), flowOf(null)) { a, b, c -> emit(a + b + c) }
+ assertEquals("12null", flow.single())
+ }
+
+ @Test
+ fun testFourParameters() = runTest {
+ val flow = combine(flowOf("1"), flowOf(2), flowOf("3"), flowOf(null)) { a, b, c, d -> a + b + c + d }
+ assertEquals("123null", flow.single())
+ }
+
+ @Test
+ fun testFourParametersTransform() = runTest {
+ val flow = combineTransform(flowOf("1"), flowOf(2), flowOf("3"), flowOf(null)) { a, b, c, d ->
+ emit(a + b + c + d)
+ }
+ assertEquals("123null", flow.single())
+ }
+
+ @Test
+ fun testFiveParameters() = runTest {
+ val flow = combine(flowOf("1"), flowOf(2), flowOf("3"), flowOf(4.toByte()), flowOf(null)) { a, b, c, d, e ->
+ a + b + c + d + e
+ }
+ assertEquals("1234null", flow.single())
+ }
+
+ @Test
+ fun testFiveParametersTransform() = runTest {
+ val flow =
+ combineTransform(flowOf("1"), flowOf(2), flowOf("3"), flowOf(4.toByte()), flowOf(null)) { a, b, c, d, e ->
+ emit(a + b + c + d + e)
+ }
+ assertEquals("1234null", flow.single())
+ }
+
+ @Test
+ fun testNonMatchingTypes() = runTest {
+ val flow = combine(flowOf(1), flowOf("2")) { args: Array<Any?> ->
+ args[0]?.toString() + args[1]?.toString()
+ }
+ assertEquals("12", flow.single())
+ }
+
+ @Test
+ fun testNonMatchingTypesIterable() = runTest {
+ val flow = combine(listOf(flowOf(1), flowOf("2"))) { args: Array<Any?> ->
+ args[0]?.toString() + args[1]?.toString()
+ }
+ assertEquals("12", flow.single())
+ }
+
+ @Test
+ fun testVararg() = runTest {
+ val flow = combine(
+ flowOf("1"),
+ flowOf(2),
+ flowOf("3"),
+ flowOf(4.toByte()),
+ flowOf("5"),
+ flowOf(null)
+ ) { arr -> arr.joinToString("") }
+ assertEquals("12345null", flow.single())
+ }
+
+ @Test
+ fun testVarargTransform() = runTest {
+ val flow = combineTransform(
+ flowOf("1"),
+ flowOf(2),
+ flowOf("3"),
+ flowOf(4.toByte()),
+ flowOf("5"),
+ flowOf(null)
+ ) { arr -> emit(arr.joinToString("")) }
+ assertEquals("12345null", flow.single())
+ }
+
+ @Test
+ fun testEmptyVararg() = runTest {
+ val list = combine(flowOf(1, 2, 3)) { args: Array<Any?> -> args[0] }.toList()
+ assertEquals(listOf(1, 2, 3), list)
+ }
+
+ @Test
+ fun testEmptyVarargTransform() = runTest {
+ val list = combineTransform(flowOf(1, 2, 3)) { args: Array<Any?> -> emit(args[0]) }.toList()
+ assertEquals(listOf(1, 2, 3), list)
+ }
+
+ @Test
+ fun testReified() = runTest {
+ val value = combine(flowOf(1), flowOf(2)) { args: Array<Int> ->
+ @Suppress("USELESS_IS_CHECK")
+ assertTrue(args is Array<Int>)
+ args[0] + args[1]
+ }.single()
+ assertEquals(3, value)
+ }
+
+ @Test
+ fun testReifiedTransform() = runTest {
+ val value = combineTransform(flowOf(1), flowOf(2)) { args: Array<Int> ->
+ @Suppress("USELESS_IS_CHECK")
+ assertTrue(args is Array<Int>)
+ emit(args[0] + args[1])
+ }.single()
+ assertEquals(3, value)
+ }
+
+ @Test
+ fun testEmpty() = runTest {
+ val value = combineTransform { args: Array<Int> ->
+ emit(args[0] + args[1])
+ }.singleOrNull()
+ assertNull(value)
+ }
+
+ @Test
+ fun testEmptyIterable() = runTest {
+ val value = combineTransform(emptyList()) { args: Array<Int> ->
+ emit(args[0] + args[1])
+ }.singleOrNull()
+ assertNull(value)
+ }
+
+ @Test
+ fun testEmptyReified() = runTest {
+ val value = combineTransform { args: Array<Int> ->
+ emit(args[0] + args[1])
+ }.singleOrNull()
+ assertNull(value)
+ }
+
+ @Test
+ fun testEmptyIterableReified() = runTest {
+ val value = combineTransform(emptyList()) { args: Array<Int> ->
+ emit(args[0] + args[1])
+ }.singleOrNull()
+ assertNull(value)
+ }
+}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt
similarity index 80%
rename from kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt
rename to kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt
index 54244f0..637cb3d 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/CombineTest.kt
@@ -6,12 +6,13 @@
import kotlinx.coroutines.*
import kotlin.test.*
-import kotlinx.coroutines.flow.combineLatest as combineLatestOriginal
+import kotlinx.coroutines.flow.combine as combineOriginal
+import kotlinx.coroutines.flow.combineTransform as combineTransformOriginal
/*
* Replace: { i, j -> i + j } -> { i, j -> i + j } as soon as KT-30991 is fixed
*/
-abstract class CombineLatestTestBase : TestBase() {
+abstract class CombineTestBase : TestBase() {
abstract fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>
@@ -239,11 +240,33 @@
}
}
-class CombineLatestTest : CombineLatestTestBase() {
- override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = combineLatestOriginal(other, transform)
+class CombineTest : CombineTestBase() {
+ override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = combineOriginal(other, transform)
}
-class CombineLatestVarargAdapterTest : CombineLatestTestBase() {
+class CombineTransformTest : CombineTestBase() {
+ override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = combineTransformOriginal(other) { a, b ->
+ emit(transform(a, b))
+ }
+}
+
+class CombineVarargAdapterTest : CombineTestBase() {
override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
- (this as Flow<*>).combineLatestOriginal(other) { args: Array<Any?> -> transform(args[0] as T1, args[1] as T2) }
-}
\ No newline at end of file
+ combineOriginal(this, other) { args: Array<Any?> -> transform(args[0] as T1, args[1] as T2) }
+}
+
+class CombineIterableTest : CombineTestBase() {
+ override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
+ combineOriginal(listOf(this, other)) { args -> transform(args[0] as T1, args[1] as T2) }
+}
+
+class CombineTransformVarargAdapterTest : CombineTestBase() {
+ override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
+ combineTransformOriginal(this, other) { args: Array<Any?> -> emit(transform(args[0] as T1, args[1] as T2)) }
+}
+
+class CombineTransformIterableTest : CombineTestBase() {
+ override fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
+ combineTransformOriginal(listOf(this, other)) { args -> emit(transform(args[0] as T1, args[1] as T2)) }
+}
+