blob: 83f5498e4d1bbbc007a08e373092b4055c0dddf7 [file] [log] [blame]
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:JvmMultifileClass
@file:JvmName("FlowKt")
@file:Suppress("UNCHECKED_CAST")
package kotlinx.coroutines.flow
import kotlinx.coroutines.flow.internal.*
import kotlinx.coroutines.internal.Symbol
import kotlin.jvm.*
/**
* Accumulates value starting with the first element and applying [operation] to current accumulator value and each element.
* Throws [NoSuchElementException] if flow was empty.
*/
public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S {
var accumulator: Any? = NULL
collect { value ->
accumulator = if (accumulator !== NULL) {
@Suppress("UNCHECKED_CAST")
operation(accumulator as S, value)
} else {
value
}
}
if (accumulator === NULL) throw NoSuchElementException("Empty flow can't be reduced")
@Suppress("UNCHECKED_CAST")
return accumulator as S
}
/**
* Accumulates value starting with [initial] value and applying [operation] current accumulator value and each element
*/
public suspend inline fun <T, R> Flow<T>.fold(
initial: R,
crossinline operation: suspend (acc: R, value: T) -> R
): R {
var accumulator = initial
collect { value ->
accumulator = operation(accumulator, value)
}
return accumulator
}
/**
* The terminal operator that awaits for one and only one value to be emitted.
* Throws [NoSuchElementException] for empty flow and [IllegalStateException] for flow
* that contains more than one element.
*/
public suspend fun <T> Flow<T>.single(): T {
var result: Any? = NULL
collect { value ->
require(result === NULL) { "Flow has more than one element" }
result = value
}
if (result === NULL) throw NoSuchElementException("Flow is empty")
return result as T
}
/**
* The terminal operator that awaits for one and only one value to be emitted.
* Returns the single value or `null`, if the flow was empty or emitted more than one value.
*/
public suspend fun <T> Flow<T>.singleOrNull(): T? {
var result: Any? = NULL
collectWhile {
// No values yet, update result
if (result === NULL) {
result = it
true
} else {
// Second value, reset result and bail out
result = NULL
false
}
}
return if (result === NULL) null else result as T
}
/**
* The terminal operator that returns the first element emitted by the flow and then cancels flow's collection.
* Throws [NoSuchElementException] if the flow was empty.
*/
public suspend fun <T> Flow<T>.first(): T {
var result: Any? = NULL
collectWhile {
result = it
false
}
if (result === NULL) throw NoSuchElementException("Expected at least one element")
return result as T
}
/**
* The terminal operator that returns the first element emitted by the flow matching the given [predicate] and then cancels flow's collection.
* Throws [NoSuchElementException] if the flow has not contained elements matching the [predicate].
*/
public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T {
var result: Any? = NULL
collectWhile {
if (predicate(it)) {
result = it
false
} else {
true
}
}
if (result === NULL) throw NoSuchElementException("Expected at least one element matching the predicate $predicate")
return result as T
}
/**
* The terminal operator that returns the first element emitted by the flow and then cancels flow's collection.
* Returns `null` if the flow was empty.
*/
public suspend fun <T> Flow<T>.firstOrNull(): T? {
var result: T? = null
collectWhile {
result = it
false
}
return result
}
/**
* The terminal operator that returns the first element emitted by the flow matching the given [predicate] and then cancels flow's collection.
* Returns `null` if the flow did not contain an element matching the [predicate].
*/
public suspend fun <T> Flow<T>.firstOrNull(predicate: suspend (T) -> Boolean): T? {
var result: T? = null
collectWhile {
if (predicate(it)) {
result = it
false
} else {
true
}
}
return result
}