blob: d739f1a64f315f0b0c760dca6206d67cf21edb01 [file] [log] [blame]
Vsevolod Tolstopyatovd5478b62019-06-06 11:43:31 +03001/*
2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5package kotlinx.coroutines.flow
6
7import kotlinx.coroutines.*
8import kotlinx.coroutines.channels.*
9import kotlin.test.*
10
11class ScanTest : TestBase() {
12 @Test
13 fun testScan() = runTest {
14 val flow = flowOf(1, 2, 3, 4, 5)
15 val result = flow.scanReduce { acc, v -> acc + v }.toList()
16 assertEquals(listOf(1, 3, 6, 10, 15), result)
17 }
18
19 @Test
20 fun testScanWithInitial() = runTest {
21 val flow = flowOf(1, 2, 3)
22 val result = flow.scan(emptyList<Int>()) { acc, value -> acc + value }.toList()
23 assertEquals(listOf(emptyList(), listOf(1), listOf(1, 2), listOf(1, 2, 3)), result)
24 }
25
26 @Test
27 fun testNulls() = runTest {
28 val flow = flowOf(null, 2, null, null, null, 5)
29 val result = flow.scanReduce { acc, v -> if (v == null) acc else (if (acc == null) v else acc + v) }.toList()
30 assertEquals(listOf(null, 2, 2, 2, 2, 7), result)
31 }
32
33 @Test
34 fun testEmptyFlow() = runTest {
35 val result = emptyFlow<Int>().scanReduce { _, _ -> 1 }.toList()
36 assertTrue(result.isEmpty())
37 }
38
39 @Test
40 fun testErrorCancelsUpstream() = runTest {
41 expect(1)
42 val latch = Channel<Unit>()
43 val flow = flow {
44 coroutineScope {
45 launch {
46 latch.send(Unit)
47 hang { expect(3) }
48 }
49 emit(1)
50 emit(2)
51 }
52 }.scanReduce { _, value ->
53 expect(value) // 2
54 latch.receive()
55 throw TestException()
56 }.onErrorCollect(emptyFlow())
57
58 assertEquals(1, flow.single())
59 finish(4)
60 }
61
62 public operator fun <T> Collection<T>.plus(element: T): List<T> {
63 val result = ArrayList<T>(size + 1)
64 result.addAll(this)
65 result.add(element)
66 return result
67 }
68}