Pr/2230 (#2287)
* Allow nullable types in Flow.firstOrNull
* Allow nullable types in Flow.singleOrNull
* Align Flow.single and Flow.singleOrNull with Kotlin standard library
Fixes #2229
Fixes #2289
Co-authored-by: Nicklas Ansman Giertz <nicklas@ansman.se>
diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt
index d36e1bb..83f5498 100644
--- a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt
+++ b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt
@@ -9,6 +9,7 @@
package kotlinx.coroutines.flow
import kotlinx.coroutines.flow.internal.*
+import kotlinx.coroutines.internal.Symbol
import kotlin.jvm.*
/**
@@ -47,33 +48,39 @@
}
/**
- * The terminal operator, that awaits for one and only one value to be published.
+ * The terminal operator that awaits for one and only one value to be emitted.
* Throws [NoSuchElementException] for empty flow and [IllegalStateException] for flow
* that contains more than one element.
*/
public suspend fun <T> Flow<T>.single(): T {
var result: Any? = NULL
collect { value ->
- if (result !== NULL) error("Expected only one element")
+ require(result === NULL) { "Flow has more than one element" }
result = value
}
- if (result === NULL) throw NoSuchElementException("Expected at least one element")
- @Suppress("UNCHECKED_CAST")
+ if (result === NULL) throw NoSuchElementException("Flow is empty")
return result as T
}
/**
- * The terminal operator, that awaits for one and only one value to be published.
- * Throws [IllegalStateException] for flow that contains more than one element.
+ * The terminal operator that awaits for one and only one value to be emitted.
+ * Returns the single value or `null`, if the flow was empty or emitted more than one value.
*/
-public suspend fun <T: Any> Flow<T>.singleOrNull(): T? {
- var result: T? = null
- collect { value ->
- if (result != null) error("Expected only one element")
- result = value
+public suspend fun <T> Flow<T>.singleOrNull(): T? {
+ var result: Any? = NULL
+ collectWhile {
+ // No values yet, update result
+ if (result === NULL) {
+ result = it
+ true
+ } else {
+ // Second value, reset result and bail out
+ result = NULL
+ false
+ }
}
- return result
+ return if (result === NULL) null else result as T
}
/**
@@ -112,7 +119,7 @@
* The terminal operator that returns the first element emitted by the flow and then cancels flow's collection.
* Returns `null` if the flow was empty.
*/
-public suspend fun <T : Any> Flow<T>.firstOrNull(): T? {
+public suspend fun <T> Flow<T>.firstOrNull(): T? {
var result: T? = null
collectWhile {
result = it
@@ -122,10 +129,10 @@
}
/**
- * The terminal operator that returns the first element emitted by the flow matching the given [predicate] and then cancels flow's collection.
+ * The terminal operator that returns the first element emitted by the flow matching the given [predicate] and then cancels flow's collection.
* Returns `null` if the flow did not contain an element matching the [predicate].
*/
-public suspend fun <T : Any> Flow<T>.firstOrNull(predicate: suspend (T) -> Boolean): T? {
+public suspend fun <T> Flow<T>.firstOrNull(predicate: suspend (T) -> Boolean): T? {
var result: T? = null
collectWhile {
if (predicate(it)) {
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt
index 7f0c548..f55e8be 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt
@@ -231,7 +231,7 @@
@Test
fun testSingle() = runTest {
- assertFailsWith<IllegalStateException> {
+ assertFailsWith<IllegalArgumentException> {
flowOf(239).onCompletion {
assertNull(it)
expect(1)
@@ -240,7 +240,7 @@
expectUnreached()
} catch (e: Throwable) {
// Second emit -- failure
- assertTrue { e is IllegalStateException }
+ assertTrue { e is IllegalArgumentException }
throw e
}
}.single()
diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt
index edb9f00..fa7fc9c 100644
--- a/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt
@@ -129,6 +129,12 @@
}
@Test
+ fun testFirstOrNullWithNullElement() = runTest {
+ assertNull(flowOf<String?>(null).firstOrNull())
+ assertNull(flowOf<String?>(null).firstOrNull { true })
+ }
+
+ @Test
fun testFirstOrNullWhenErrorCancelsUpstream() = runTest {
val latch = Channel<Unit>()
val flow = flow {
diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt
index 4e89b93..2c1277b 100644
--- a/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt
@@ -7,7 +7,7 @@
import kotlinx.coroutines.*
import kotlin.test.*
-class SingleTest : TestBase() {
+class SingleTest : TestBase() {
@Test
fun testSingle() = runTest {
@@ -25,8 +25,8 @@
emit(239L)
emit(240L)
}
- assertFailsWith<RuntimeException> { flow.single() }
- assertFailsWith<RuntimeException> { flow.singleOrNull() }
+ assertFailsWith<IllegalArgumentException> { flow.single() }
+ assertNull(flow.singleOrNull())
}
@Test
@@ -61,6 +61,10 @@
assertEquals(1, flowOf<Int?>(1).single())
assertNull(flowOf<Int?>(null).single())
assertFailsWith<NoSuchElementException> { flowOf<Int?>().single() }
+
+ assertEquals(1, flowOf<Int?>(1).singleOrNull())
+ assertNull(flowOf<Int?>(null).singleOrNull())
+ assertNull(flowOf<Int?>().singleOrNull())
}
@Test
@@ -69,5 +73,22 @@
val flow = flowOf(instance)
assertSame(instance, flow.single())
assertSame(instance, flow.singleOrNull())
+
+ val flow2 = flow {
+ emit(BadClass())
+ emit(BadClass())
+ }
+ assertFailsWith<IllegalArgumentException> { flow2.single() }
+ }
+
+ @Test
+ fun testSingleNoWait() = runTest {
+ val flow = flow {
+ emit(1)
+ emit(2)
+ awaitCancellation()
+ }
+
+ assertNull(flow.singleOrNull())
}
}