blob: 7774d17ed1981aba3d242f30b37fc5b2daa743a4 [file] [log] [blame]
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.operators.*
import kotlin.test.*
class FlattenMergeTest : FlatMapMergeBaseTest() {
override fun <T> Flow<T>.flatMap(mapper: suspend (T) -> Flow<T>): Flow<T> = map(mapper).flattenMerge()
@Test
override fun testFlatMapConcurrency() = runTest {
var concurrentRequests = 0
val flow = (1..100).asFlow().map() { value ->
flow {
++concurrentRequests
emit(value)
delay(Long.MAX_VALUE)
}
}.flattenMerge(concurrency = 2)
val consumer = launch {
flow.collect { value ->
expect(value)
}
}
repeat(4) {
yield()
}
assertEquals(2, concurrentRequests)
consumer.cancelAndJoin()
finish(3)
}
}