blob: 7774d17ed1981aba3d242f30b37fc5b2daa743a4 [file] [log] [blame]
Vsevolod Tolstopyatov87884882019-04-09 18:36:22 +03001/*
2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5package kotlinx.coroutines.flow
6
7import kotlinx.coroutines.*
8import kotlinx.coroutines.flow.operators.*
9import kotlin.test.*
10
11class FlattenMergeTest : FlatMapMergeBaseTest() {
12
13 override fun <T> Flow<T>.flatMap(mapper: suspend (T) -> Flow<T>): Flow<T> = map(mapper).flattenMerge()
14
15 @Test
16 override fun testFlatMapConcurrency() = runTest {
17 var concurrentRequests = 0
18 val flow = (1..100).asFlow().map() { value ->
19 flow {
20 ++concurrentRequests
21 emit(value)
22 delay(Long.MAX_VALUE)
23 }
24 }.flattenMerge(concurrency = 2)
25
26 val consumer = launch {
27 flow.collect { value ->
28 expect(value)
29 }
30 }
31
32 repeat(4) {
33 yield()
34 }
35
36 assertEquals(2, concurrentRequests)
37 consumer.cancelAndJoin()
38 finish(3)
39 }
40}