blob: f55e8beeb2e9e227bdfd023518dfa472850e9874 [file] [log] [blame]
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.internal.*
import kotlin.test.*
class OnCompletionTest : TestBase() {
@Test
fun testOnCompletion() = runTest {
flow {
expect(1)
emit(2)
expect(4)
}.onEach {
expect(2)
}.onCompletion {
assertNull(it)
expect(5)
}.onEach {
expect(3)
}.collect()
finish(6)
}
@Test
fun testOnCompletionWithException() = runTest {
flowOf(1).onEach {
expect(1)
throw TestException()
}.onCompletion {
assertTrue(it is TestException)
expect(2)
}.catch {
assertTrue(it is TestException)
expect(3)
}.collect()
finish(4)
}
@Test
fun testOnCompletionWithExceptionDownstream() = runTest {
flow {
expect(1)
emit(2)
}.onEach {
expect(2)
}.onCompletion {
assertTrue(it is TestException) // flow fails because of this exception
expect(4)
}.onEach {
expect(3)
throw TestException()
}.catch {
assertTrue(it is TestException)
expect(5)
}.collect()
finish(6)
}
@Test
fun testMultipleOnCompletions() = runTest {
flowOf(1).onCompletion {
assertTrue(it is TestException)
expect(2)
}.onEach {
expect(1)
throw TestException()
}.onCompletion {
assertTrue(it is TestException)
expect(3)
}.catch {
assertTrue(it is TestException)
expect(4)
}.collect()
finish(5)
}
@Test
fun testExceptionFromOnCompletion() = runTest {
flowOf(1).onEach {
expect(1)
throw TestException()
}.onCompletion {
expect(2)
throw TestException2()
}.catch {
assertTrue(it is TestException2)
expect(3)
}.collect()
finish(4)
}
@Test
fun testContextPreservation() = runTest {
flowOf(1).onCompletion {
assertEquals("OK", NamedDispatchers.name())
assertNull(it)
expect(1)
}.flowOn(NamedDispatchers("OK"))
.onEach {
expect(2)
assertEquals("default", NamedDispatchers.nameOr("default"))
throw TestException()
}
.catch {
assertTrue(it is TestException)
expect(3)
}.collect()
finish(4)
}
@Test
fun testEmitExample() = runTest {
val flow = flowOf("a", "b", "c")
.onCompletion() { emit("Done") }
assertEquals(listOf("a", "b", "c", "Done"), flow.toList())
}
sealed class TestData {
data class Value(val i: Int) : TestData()
data class Done(val e: Throwable?) : TestData() {
override fun equals(other: Any?): Boolean =
other is Done && other.e?.message == e?.message
}
}
@Test
fun testCrashedEmit() = runTest {
expect(1)
val collected = ArrayList<TestData>()
assertFailsWith<TestException> {
(1..10).asFlow()
.map<Int, TestData> { TestData.Value(it) }
.onEach { value ->
value as TestData.Value
expect(value.i + 1)
if (value.i == 6) throw TestException("OK")
yield()
}
.onCompletion { e ->
expect(8)
assertTrue(e is TestException)
emit(TestData.Done(e)) // will fail
}.collect {
collected += it
}
}
val expected: List<TestData> = (1..5).map { TestData.Value(it) }
assertEquals(expected, collected)
finish(9)
}
@Test
fun testCancelledEmit() = runTest {
expect(1)
val collected = ArrayList<TestData>()
assertFailsWith<JobCancellationException> {
coroutineScope {
(1..10).asFlow()
.map<Int, TestData> { TestData.Value(it) }
.onEach { value ->
value as TestData.Value
expect(value.i + 1)
if (value.i == 6) coroutineContext.cancel()
yield()
}
.onCompletion { e ->
expect(8)
assertTrue(e is CancellationException)
try {
emit(TestData.Done(e))
expectUnreached()
} finally {
expect(9)
}
}.collect {
collected += it
}
}
}
val expected = (1..5).map<Int, TestData> { TestData.Value(it) }
assertEquals(expected, collected)
finish(10)
}
@Test
fun testFailedEmit() = runTest {
val cause = TestException()
assertFailsWith<TestException> {
flow<TestData> {
expect(1)
emit(TestData.Value(2))
expectUnreached()
}.onCompletion {
assertSame(cause, it) // flow failed because of the exception in downstream
expect(3)
try {
emit(TestData.Done(it))
expectUnreached()
} catch (e: TestException) {
assertSame(cause, e)
finish(4)
}
}.collect {
expect((it as TestData.Value).i)
throw cause
}
}
}
@Test
fun testFirst() = runTest {
val value = flowOf(239).onCompletion {
assertNotNull(it) // the flow did not complete normally
expect(1)
try {
emit(42)
expectUnreached()
} catch (e: Throwable) {
assertTrue { e is AbortFlowException }
}
}.first()
assertEquals(239, value)
finish(2)
}
@Test
fun testSingle() = runTest {
assertFailsWith<IllegalArgumentException> {
flowOf(239).onCompletion {
assertNull(it)
expect(1)
try {
emit(42)
expectUnreached()
} catch (e: Throwable) {
// Second emit -- failure
assertTrue { e is IllegalArgumentException }
throw e
}
}.single()
expectUnreached()
}
finish(2)
}
@Test
fun testEmptySingleInterference() = runTest {
val value = emptyFlow<Int>().onCompletion {
assertNull(it)
expect(1)
emit(42)
}.single()
assertEquals(42, value)
finish(2)
}
@Test
fun testTransparencyViolation() = runTest {
val flow = emptyFlow<Int>().onCompletion {
expect(2)
coroutineScope {
launch {
try {
emit(1)
} catch (e: IllegalStateException) {
expect(3)
}
}
}
}
expect(1)
assertNull(flow.singleOrNull())
finish(4)
}
@Test
fun testTakeOnCompletion() = runTest {
// even though it uses "take" from the outside it completes normally
val flow = (1..10).asFlow().take(5)
val result = flow.onCompletion { cause ->
assertNull(cause)
emit(-1)
}.toList()
val expected = (1..5).toList() + (-1)
assertEquals(expected, result)
}
}