blob: fe37ae67de410972e8c0d49d7275466d83a67532 [file] [log] [blame]
Vsevolod Tolstopyatov87884882019-04-09 18:36:22 +03001/*
2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5
6package kotlinx.coroutines.flow.operators
7
8import kotlinx.coroutines.*
9import kotlinx.coroutines.channels.*
10import kotlinx.coroutines.flow.*
11import kotlin.test.*
12
13abstract class FlatMapMergeBaseTest : FlatMapBaseTest() {
14 @Test
15 fun testFailureCancellation() = runTest {
16 val flow = flow {
17 expect(2)
18 emit(1)
19 expect(3)
20 emit(2)
21 expect(4)
22 }.flatMap {
23 if (it == 1) flow {
24 hang { expect(6) }
25 } else flow<Int> {
26 expect(5)
27 throw TestException()
28 }
29 }
30
31 expect(1)
32 assertFailsWith<TestException> { flow.singleOrNull() }
33 finish(7)
34 }
35
36 @Test
37 fun testConcurrentFailure() = runTest {
38 val latch = Channel<Unit>()
39 val flow = flow {
40 expect(2)
41 emit(1)
42 expect(3)
43 emit(2)
44 }.flatMap {
45 if (it == 1) flow<Int> {
46 expect(5)
47 latch.send(Unit)
48 hang {
49 expect(7)
50 throw TestException2()
51
52 }
53 } else {
54 expect(4)
55 latch.receive()
56 expect(6)
57 throw TestException()
58 }
59 }
60
61 expect(1)
62 assertFailsWith<TestException>(flow)
63 finish(8)
64 }
65
66 @Test
67 fun testFailureInMapOperationCancellation() = runTest {
68 val latch = Channel<Unit>()
69 val flow = flow {
70 expect(2)
71 emit(1)
72 expect(3)
73 emit(2)
74 expectUnreached()
75 }.flatMap {
76 if (it == 1) flow<Int> {
77 expect(5)
78 latch.send(Unit)
79 hang { expect(7) }
80 } else {
81 expect(4)
82 latch.receive()
83 expect(6)
84 throw TestException()
85 }
86 }
87
88 expect(1)
89 assertFailsWith<TestException> { flow.count() }
90 finish(8)
91 }
92
93 @Test
94 abstract fun testFlatMapConcurrency()
95}