blob: 7470289ecee46c76b702d74e15079364abdf0b07 [file] [log] [blame]
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import kotlin.test.*
class FlatMapMergeTest : FlatMapMergeBaseTest() {
override fun <T> Flow<T>.flatMap(mapper: suspend (T) -> Flow<T>): Flow<T> = flatMapMerge(transform = mapper)
@Test
override fun testFlatMapConcurrency() = runTest {
var concurrentRequests = 0
val flow = (1..100).asFlow().flatMapMerge(concurrency = 2) { value ->
flow {
++concurrentRequests
emit(value)
delay(Long.MAX_VALUE)
}
}
val consumer = launch {
flow.collect { value ->
expect(value)
}
}
repeat(4) {
yield()
}
assertEquals(2, concurrentRequests)
consumer.cancelAndJoin()
finish(3)
}
@Test
fun testAtomicStart() = runTest {
try {
coroutineScope {
val job = coroutineContext[Job]!!
val flow = flow {
expect(3)
emit(1)
}
.onCompletion { expect(5) }
.flatMapMerge {
expect(4)
flowOf(it).onCompletion { expectUnreached() } }
.onCompletion { expect(6) }
launch {
expect(1)
flow.collect()
}
launch {
expect(2)
yield()
job.cancel()
}
}
} catch (e: CancellationException) {
finish(7)
}
}
@Test
fun testCancellationExceptionDownstream() = runTest {
val flow = flow {
emit(1)
hang { expect(2) }
}.flatMapMerge {
flow {
emit(it)
expect(1)
throw CancellationException("")
}
}
assertFailsWith<CancellationException>(flow)
finish(3)
}
@Test
fun testCancellationExceptionUpstream() = runTest {
val flow = flow {
expect(1)
emit(1)
expect(2)
yield()
throw CancellationException("")
}.flatMapMerge {
flow {
expect(3)
emit(it)
hang { expect(4) }
}
}
assertFailsWith<CancellationException>(flow)
finish(5)
}
@Test
fun testCancellation() = runTest {
val result = flow {
emit(1)
emit(2)
emit(3)
emit(4)
expectUnreached() // Cancelled by take
emit(5)
}.flatMapMerge(2) { v -> flow { emit(v) } }
.take(2)
.toList()
assertEquals(listOf(1, 2), result)
}
}