blob: 134d808b0d03c391198caaf01c04422410b1d3b8 [file] [log] [blame] [view]
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001<!--- INCLUDE .*/example-([a-z]+)-([0-9a-z]+)\.kt
2/*
3 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
4 */
5
6// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
7package kotlinx.coroutines.guide.$$1$$2
8-->
9<!--- KNIT ../kotlinx-coroutines-core/jvm/test/guide/.*-##\.kt -->
10<!--- TEST_OUT ../kotlinx-coroutines-core/jvm/test/guide/test/FlowGuideTest.kt
11// This file was automatically generated from flow.md by Knit tool. Do not edit.
12package kotlinx.coroutines.guide.test
13
14import org.junit.Test
15
16class FlowGuideTest {
17-->
18
19**Table of contents**
20
21<!--- TOC -->
22
23* [Asynchronous Flow](#asynchronous-flow)
24 * [Representing multiple values](#representing-multiple-values)
25 * [Sequences](#sequences)
26 * [Suspending functions](#suspending-functions)
27 * [Flows](#flows)
28 * [Flows are cold](#flows-are-cold)
29 * [Flow cancellation](#flow-cancellation)
30 * [Flow builders](#flow-builders)
31 * [Intermediate flow operators](#intermediate-flow-operators)
32 * [Transform operator](#transform-operator)
33 * [Size-limiting operators](#size-limiting-operators)
34 * [Terminal flow operators](#terminal-flow-operators)
35 * [Flows are sequential](#flows-are-sequential)
36 * [Flow context](#flow-context)
37 * [Wrong emission withContext](#wrong-emission-withcontext)
38 * [flowOn operator](#flowon-operator)
39 * [Buffering](#buffering)
40 * [Conflation](#conflation)
41 * [Processing the latest value](#processing-the-latest-value)
42 * [Composing multiple flows](#composing-multiple-flows)
43 * [Zip](#zip)
44 * [Combine](#combine)
45 * [Flattening flows](#flattening-flows)
46 * [flatMapConcat](#flatmapconcat)
47 * [flatMapMerge](#flatmapmerge)
48 * [flatMapLatest](#flatmaplatest)
49 * [Flow exceptions](#flow-exceptions)
50 * [Collector try and catch](#collector-try-and-catch)
51 * [Everything is caught](#everything-is-caught)
52 * [Exception transparency](#exception-transparency)
53 * [Transparent catch](#transparent-catch)
54 * [Catching declaratively](#catching-declaratively)
55 * [Flow completion](#flow-completion)
56 * [Imperative finally block](#imperative-finally-block)
57 * [Declarative handling](#declarative-handling)
58 * [Upstream exceptions only](#upstream-exceptions-only)
59 * [Imperative versus declarative](#imperative-versus-declarative)
60 * [Launching flow](#launching-flow)
61
62<!--- END_TOC -->
63
64## Asynchronous Flow
65
66Suspending functions asynchronously return a single value, but how can you return
67multiple asynchronously computed values? That is what Kotlin Flows are for.
68
69### Representing multiple values
70
71Multiple values can be represented in Kotlin using [collections].
72For example, we can have a function `foo()` that returns a [List]
73of three numbers and print them all using [forEach]:
74
75<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
76
77```kotlin
78fun foo(): List<Int> = listOf(1, 2, 3)
79
80fun main() {
81 foo().forEach { value -> println(value) }
82}
83```
84
85</div>
86
87> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-01.kt).
88
89This code outputs:
90
91```text
921
932
943
95```
96
97<!--- TEST -->
98
99#### Sequences
100
101If the numbers are computed with some CPU-consuming blocking code
102(each computation taking 100ms) then we can represent the numbers using a [Sequence]:
103
104<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
105
106```kotlin
107fun foo(): Sequence<Int> = sequence { // sequence builder
108 for (i in 1..3) {
109 Thread.sleep(100) // pretend we are computing it
110 yield(i) // yield next value
111 }
112}
113
114fun main() {
115 foo().forEach { value -> println(value) }
116}
117```
118
119</div>
120
121> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-02.kt).
122
123This code outputs the same numbers, but it waits 100ms before printing each one.
124
125<!--- TEST
1261
1272
1283
129-->
130
131#### Suspending functions
132
133However, this computation blocks the main thread that is running the code.
134When those values are computed by an asynchronous code we can mark function `foo` with a `suspend` modifier,
135so that it can perform its work without blocking and return the result as a list:
136
137<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
138
139```kotlin
140import kotlinx.coroutines.*
141
142//sampleStart
143suspend fun foo(): List<Int> {
144 delay(1000) // pretend we are doing something asynchronous here
145 return listOf(1, 2, 3)
146}
147
148fun main() = runBlocking<Unit> {
149 foo().forEach { value -> println(value) }
150}
151//sampleEnd
152```
153
154</div>
155
156> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-03.kt).
157
158This code prints the numbers after waiting for a second.
159
160<!--- TEST
1611
1622
1633
164-->
165
166#### Flows
167
168Using `List<Int>` result type we can only return all the values at once. To represent
169the stream of values that are being asynchronously computed we can use [`Flow<Int>`][Flow] type similarly
170to the `Sequence<Int>` type for synchronously computed values:
171
172<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
173
174```kotlin
175import kotlinx.coroutines.*
176import kotlinx.coroutines.flow.*
177
178//sampleStart
179fun foo(): Flow<Int> = flow { // flow builder
180 for (i in 1..3) {
181 delay(100) // pretend we are doing something useful here
182 emit(i) // emit next value
183 }
184}
185
186fun main() = runBlocking<Unit> {
187 // Launch a concurrent coroutine to see that the main thread is not blocked
188 launch {
189 for (k in 1..3) {
190 println("I'm not blocked $k")
191 delay(100)
192 }
193 }
194 // Collect the flow
195 foo().collect { value -> println(value) }
196}
197//sampleEnd
198```
199
200</div>
201
202> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-04.kt).
203
204This code waits 100ms before printing each number without blocking the main thread. This is verified
205by printing "I'm not blocked" every 100ms from a separate coroutine that is running in the main thread:
206
207```text
208I'm not blocked 1
2091
210I'm not blocked 2
2112
212I'm not blocked 3
2133
214```
215
216<!--- TEST -->
217
218Notice the following differences of the code with the [Flow] from the earlier examples:
219
220* A builder function for [Flow] type is called [flow].
221* Code inside the `flow { ... }` builder block can suspend.
222* The function `foo()` is no longer marked with `suspend` modifier.
223* Values are _emitted_ from the flow using [emit][FlowCollector.emit] function.
224* Values are _collected_ from the flow using [collect][collect] function.
225
226> You can replace [delay] with `Thread.sleep` in the body of `foo`'s `flow { ... }` and see that the main
227thread is blocked in this case.
228
229### Flows are cold
230
231Flows are _cold_ streams similarly to sequences &mdash; the code inside a [flow] builder does not
232run until the flow is collected. This becomes clear in the following example:
233
234<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
235
236```kotlin
237import kotlinx.coroutines.*
238import kotlinx.coroutines.flow.*
239
240//sampleStart
241fun foo(): Flow<Int> = flow {
242 println("Flow started")
243 for (i in 1..3) {
244 delay(100)
245 emit(i)
246 }
247}
248
249fun main() = runBlocking<Unit> {
250 println("Calling foo...")
251 val flow = foo()
252 println("Calling collect...")
253 flow.collect { value -> println(value) }
254 println("Calling collect again...")
255 flow.collect { value -> println(value) }
256}
257//sampleEnd
258```
259
260</div>
261
262> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-05.kt).
263
264Which prints:
265
266```text
267Calling foo...
268Calling collect...
269Flow started
2701
2712
2723
273Calling collect again...
274Flow started
2751
2762
2773
278```
279
280<!--- TEST -->
281
282That is a key reason why the `foo()` function (which returns a flow) is not marked with `suspend` modifier.
283By itself, `foo()` returns quickly and does not wait for anything. The flow starts every time it is collected,
284that is why we see that when we call `collect` again, we get "Flow started" printed again.
285
286### Flow cancellation
287
288Flow adheres to general cooperative cancellation of coroutines. However, flow infrastructure does not introduce
289additional cancellation points. It is fully transparent for cancellation. As usual, flow collection can be
290cancelled when the flow is suspended in a cancellable suspending function (like [delay]) and cannot be cancelled otherwise.
291
292The following example shows how the flow gets cancelled on timeout when running in [withTimeoutOrNull] block
293and stops executing its code:
294
295<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
296
297```kotlin
298import kotlinx.coroutines.*
299import kotlinx.coroutines.flow.*
300
301//sampleStart
302fun foo(): Flow<Int> = flow {
303 for (i in 1..3) {
304 delay(100)
305 println("Emitting $i")
306 emit(i)
307 }
308}
309
310fun main() = runBlocking<Unit> {
311 withTimeoutOrNull(250) { // Timeout after 250ms
312 foo().collect { value -> println(value) }
313 }
314 println("Done")
315}
316//sampleEnd
317```
318
319</div>
320
321> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-06.kt).
322
323Notice how only two numbers get emitted by the flow in `foo()` function, producing the following output:
324
325```text
326Emitting 1
3271
328Emitting 2
3292
330Done
331```
332
333<!--- TEST -->
334
335### Flow builders
336
337The `flow { ... }` builder from the previous examples is the most basic one. There are other builders for
338convenient declaration of flows:
339
340* [flowOf] builder that defines a flow emitting a fixed set of values.
341* Various collections and sequences can be converted to flows using `.asFlow()` extension functions.
342
343Thus, the example that prints numbers from 1 to 3 from a flow can be written as:
344
345<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
346
347```kotlin
348import kotlinx.coroutines.*
349import kotlinx.coroutines.flow.*
350
351fun main() = runBlocking<Unit> {
352//sampleStart
353 // Convert an integer range to a flow
354 (1..3).asFlow().collect { value -> println(value) }
355//sampleEnd
356}
357```
358
359</div>
360
361> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-07.kt).
362
363<!--- TEST
3641
3652
3663
367-->
368
369### Intermediate flow operators
370
371Flows can be transformed with operators similarly to collections and sequences.
372Intermediate operators are applied to an upstream flow and return a downstream flow.
373These operators are cold, just like flows are. A call to such an operator is not
374a suspending function itself. It works quickly, returning the definition of a new transformed flow.
375
376The basic operators have familiar names like [map] and [filter].
377The important difference from sequences is that blocks of
378code inside those operators can call suspending functions.
379
380For example, a flow of incoming requests can be
381mapped to results with the [map] operator even when performing a request is a long-running
382operation that is implemented by a suspending function:
383
384<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
385
386```kotlin
387import kotlinx.coroutines.*
388import kotlinx.coroutines.flow.*
389
390//sampleStart
391suspend fun performRequest(request: Int): String {
392 delay(1000) // imitate long-running asynchronous work
393 return "response $request"
394}
395
396fun main() = runBlocking<Unit> {
397 (1..3).asFlow() // a flow of requests
398 .map { request -> performRequest(request) }
399 .collect { response -> println(response) }
400}
401//sampleEnd
402```
403
404</div>
405
406> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-08.kt).
407
408It produces the following three lines, each line appearing after a second:
409
410```text
411response 1
412response 2
413response 3
414```
415
416<!--- TEST -->
417
418#### Transform operator
419
420Among the flow transformation operators, the most general one is called [transform]. It can be used to imitate
421simple transformations like [map] and [filter] as well as implement more complex transformations.
422Using `transform` operator, you can [emit][FlowCollector.emit] arbitrary values an arbitrary number of times.
423
424For example, using `transform` we can emit a string before performing a long-running asynchronous request
425and follow it with a response:
426
427<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
428
429```kotlin
430import kotlinx.coroutines.*
431import kotlinx.coroutines.flow.*
432
433suspend fun performRequest(request: Int): String {
434 delay(1000) // imitate long-running asynchronous work
435 return "response $request"
436}
437
438fun main() = runBlocking<Unit> {
439//sampleStart
440 (1..3).asFlow() // a flow of requests
441 .transform { request ->
442 emit("Making request $request")
443 emit(performRequest(request))
444 }
445 .collect { response -> println(response) }
446//sampleEnd
447}
448```
449
450</div>
451
452> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-09.kt).
453
454The output of this code is:
455
456```text
457Making request 1
458response 1
459Making request 2
460response 2
461Making request 3
462response 3
463```
464
465<!--- TEST -->
466
467#### Size-limiting operators
468
469Size-limiting intermediate operators like [take] cancel the execution of the flow when the corresponding limit
470is reached. Cancellation in coroutines is always performed by throwing an exception so that all the resource-management
471functions (like `try { ... } finally { ... }` blocks) operate normally in case of cancellation:
472
473<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
474
475```kotlin
476import kotlinx.coroutines.*
477import kotlinx.coroutines.flow.*
478
479//sampleStart
480fun numbers(): Flow<Int> = flow {
481 try {
482 emit(1)
483 emit(2)
484 println("This line will not execute")
485 emit(3)
486 } finally {
487 println("Finally in numbers")
488 }
489}
490
491fun main() = runBlocking<Unit> {
492 numbers()
493 .take(2) // take only the first two
494 .collect { value -> println(value) }
495}
496//sampleEnd
497```
498
499</div>
500
501> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-10.kt).
502
503The output of this code clearly shows that execution of the `flow { ... }` body in `numbers()` function
504had stopped after emitting the second number:
505
506```text
5071
5082
509Finally in numbers
510```
511
512<!--- TEST -->
513
514### Terminal flow operators
515
516Terminal operators on flows are _suspending functions_ that start a collection of the flow.
517The [collect] operator is the most basic one, but there are other terminal operators for
518convenience:
519
520* Conversion to various collections like [toList] and [toSet].
521* Operators to get the [first] value and to ensure that a flow emits a [single] value.
522* Reducing a flow to a value with [reduce] and [fold].
523
524For example:
525
526<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
527
528```kotlin
529import kotlinx.coroutines.*
530import kotlinx.coroutines.flow.*
531
532fun main() = runBlocking<Unit> {
533//sampleStart
534 val sum = (1..5).asFlow()
535 .map { it * it } // squares of numbers from 1 to 5
536 .reduce { a, b -> a + b } // sum them (terminal operator)
537 println(sum)
538//sampleEnd
539}
540```
541
542</div>
543
544> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-11.kt).
545
546Prints a single number:
547
548```text
54955
550```
551
552<!--- TEST -->
553
554### Flows are sequential
555
556Each individual collection of a flow is performed sequentially unless special operators that operate
557on multiple flows are used. The collection works directly in the coroutine that calls a terminal operator.
558No new coroutines are launched by default.
559Each emitted value is processed by all intermediate operators from
560upstream to downstream and is delivered to the terminal operator after that.
561
562See the following example that filters even integers and maps them to strings:
563
564<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
565
566```kotlin
567import kotlinx.coroutines.*
568import kotlinx.coroutines.flow.*
569
570fun main() = runBlocking<Unit> {
571//sampleStart
572 (1..5).asFlow()
573 .filter {
574 println("Filter $it")
575 it % 2 == 0
576 }
577 .map {
578 println("Map $it")
579 "string $it"
580 }.collect {
581 println("Collect $it")
582 }
583//sampleEnd
584}
585```
586
587</div>
588
589> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-12.kt).
590
591Producing:
592
593```text
594Filter 1
595Filter 2
596Map 2
597Collect string 2
598Filter 3
599Filter 4
600Map 4
601Collect string 4
602Filter 5
603```
604
605<!--- TEST -->
606
607### Flow context
608
609Collection of a flow always happens in the context of the calling coroutine. For example, if there is
610a `foo` flow, then the following code runs in the context specified
611by the author of this code, regardless of implementation details of the `foo` flow:
612
613<div class="sample" markdown="1" theme="idea" data-highlight-only>
614
615```kotlin
616withContext(context) {
617 foo.collect { value ->
618 println(value) // run in the specified context
619 }
620}
621```
622
623</div>
624
625<!--- CLEAR -->
626
627This property of a flow is called _context preservation_.
628
629So, by default, code in the `flow { ... }` builder runs in the context that is provided by a collector
630of the corresponding flow. For example, consider the implementation of `foo` that prints the thread
631it is called on and emits three numbers:
632
633<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
634
635```kotlin
636import kotlinx.coroutines.*
637import kotlinx.coroutines.flow.*
638
639fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
640
641//sampleStart
642fun foo(): Flow<Int> = flow {
643 log("Started foo flow")
644 for (i in 1..3) {
645 emit(i)
646 }
647}
648
649fun main() = runBlocking<Unit> {
650 foo().collect { value -> log("Collected $value") }
651}
652//sampleEnd
653```
654
655</div>
656
657> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-13.kt).
658
659Running this code produces:
660
661```text
662[main @coroutine#1] Started foo flow
663[main @coroutine#1] Collected 1
664[main @coroutine#1] Collected 2
665[main @coroutine#1] Collected 3
666```
667
668<!--- TEST FLEXIBLE_THREAD -->
669
670Since `foo().collect` is called from the main thread, the body of `foo`'s flow is also called in the main thread.
671This is a perfect default for fast-running or asynchronous code that does not care about the execution context and
672does not block the caller.
673
674#### Wrong emission withContext
675
676However, the long-running CPU-consuming code might need to be executed in the context of [Dispatchers.Default] and UI-updating
677code might need to be executed in the context of [Dispatchers.Main]. Usually, [withContext] is used
678to change the context in code using Kotlin coroutines, but code in the `flow { ... }` builder has to honor context
679preservation property and is not allowed to [emit][FlowCollector.emit] from a different context.
680
681Try running the following code:
682
683<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
684
685```kotlin
686import kotlinx.coroutines.*
687import kotlinx.coroutines.flow.*
688
689//sampleStart
690fun foo(): Flow<Int> = flow {
691 // WRONG way to change context for CPU-consuming code in flow builder
692 kotlinx.coroutines.withContext(Dispatchers.Default) {
693 for (i in 1..3) {
694 Thread.sleep(100) // pretend we are computing it in CPU-consuming way
695 emit(i) // emit next value
696 }
697 }
698}
699
700fun main() = runBlocking<Unit> {
701 foo().collect { value -> println(value) }
702}
703//sampleEnd
704```
705
706</div>
707
708> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-14.kt).
709
710This code produces the following exception:
711
712<!--- TEST EXCEPTION
713Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
714 Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
715 but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, DefaultDispatcher].
716 Please refer to 'flow' documentation or use 'flowOn' instead
717 at ...
718-->
719
720> Note that we had to use a fully qualified name of [kotlinx.coroutines.withContext][withContext] function in this example to
721demonstrate this exception. A short name of `withContext` would have resolved to a special stub function that
722produces compilation error to prevent us from running into this problem.
723
724#### flowOn operator
725
726The exception refers to [flowOn] function that shall be used to change the context of flow emission.
727The correct way of changing the context of a flow is shown in the below example, which also prints
728names of the corresponding threads to show how it all works:
729
730<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
731
732```kotlin
733import kotlinx.coroutines.*
734import kotlinx.coroutines.flow.*
735
736fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
737
738//sampleStart
739fun foo(): Flow<Int> = flow {
740 for (i in 1..3) {
741 Thread.sleep(100) // pretend we are computing it in CPU-consuming way
742 log("Emitting $i")
743 emit(i) // emit next value
744 }
745}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
746
747fun main() = runBlocking<Unit> {
748 foo().collect { value ->
749 log("Collected $value")
750 }
751}
752//sampleEnd
753```
754
755</div>
756
757> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-15.kt).
758
759Notice how `flow { ... }` works in the background thread, while collection happens in the main thread:
760
761<!--- TEST FLEXIBLE_THREAD
762[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
763[main @coroutine#1] Collected 1
764[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
765[main @coroutine#1] Collected 2
766[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
767[main @coroutine#1] Collected 3
768-->
769
770Another observation here is that [flowOn] operator had changed the default sequential nature of the flow.
771Now collection happens in one coroutine ("coroutine#1") and emission happens in another coroutine
772("coroutine#2") that is running in another thread concurrently with collecting coroutine. The [flowOn] operator
773creates another coroutine for an upstream flow when it has to change the [CoroutineDispatcher] in its context.
774
775### Buffering
776
777Running different parts of a flow in different coroutines can be helpful from the standpoint of overall time it takes
778to collect the flow, especially when long-running asynchronous operations are involved. For example, consider a case when
779emission by `foo()` flow is slow, taking 100 ms to produce an element; and collector is also slow,
780taking 300 ms to process an element. Let us see how long does it take to collect such a flow with three numbers:
781
782<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
783
784```kotlin
785import kotlinx.coroutines.*
786import kotlinx.coroutines.flow.*
787import kotlin.system.*
788
789//sampleStart
790fun foo(): Flow<Int> = flow {
791 for (i in 1..3) {
792 delay(100) // pretend we are asynchronously waiting 100 ms
793 emit(i) // emit next value
794 }
795}
796
797fun main() = runBlocking<Unit> {
798 val time = measureTimeMillis {
799 foo().collect { value ->
800 delay(300) // pretend we are processing it for 300 ms
801 println(value)
802 }
803 }
804 println("Collected in $time ms")
805}
806//sampleEnd
807```
808
809</div>
810
811> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-16.kt).
812
813It produces something like this, the whole collection taking around 1200 ms (three numbers times 400 ms each):
814
815```text
8161
8172
8183
819Collected in 1220 ms
820```
821
822<!--- TEST ARBITRARY_TIME -->
823
824We can use [buffer] operator on a flow to run emitting code of `foo()` concurrently with collecting code,
825as opposed to running them sequentially:
826
827<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
828
829```kotlin
830import kotlinx.coroutines.*
831import kotlinx.coroutines.flow.*
832import kotlin.system.*
833
834fun foo(): Flow<Int> = flow {
835 for (i in 1..3) {
836 delay(100) // pretend we are asynchronously waiting 100 ms
837 emit(i) // emit next value
838 }
839}
840
841fun main() = runBlocking<Unit> {
842//sampleStart
843 val time = measureTimeMillis {
844 foo()
845 .buffer() // buffer emissions, don't wait
846 .collect { value ->
847 delay(300) // pretend we are processing it for 300 ms
848 println(value)
849 }
850 }
851 println("Collected in $time ms")
852//sampleEnd
853}
854```
855
856</div>
857
858> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-17.kt).
859
860It produces the same numbers faster, as we have effectively created a processing pipeline,
861only having to wait 100 ms for the first number and then spending only 300 ms to process
862each number. This way it takes around 1000 ms to run:
863
864```text
8651
8662
8673
868Collected in 1071 ms
869```
870
871<!--- TEST ARBITRARY_TIME -->
872
873> Note that [flowOn] operator uses the same buffering mechanism when it has to change [CoroutineDispatcher],
874but here we explicitly request buffering without changing execution context.
875
876#### Conflation
877
878When flow represents partial results of some operation or operation status updates, it may not be necessary
879to process each value, but only to process the most recent ones. In this case, [conflate] operator can be used to skip
880intermediate values when a collector is too slow to process them. Building on the previous example:
881
882<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
883
884```kotlin
885import kotlinx.coroutines.*
886import kotlinx.coroutines.flow.*
887import kotlin.system.*
888
889fun foo(): Flow<Int> = flow {
890 for (i in 1..3) {
891 delay(100) // pretend we are asynchronously waiting 100 ms
892 emit(i) // emit next value
893 }
894}
895
896fun main() = runBlocking<Unit> {
897//sampleStart
898 val time = measureTimeMillis {
899 foo()
900 .conflate() // conflate emissions, don't process each one
901 .collect { value ->
902 delay(300) // pretend we are processing it for 300 ms
903 println(value)
904 }
905 }
906 println("Collected in $time ms")
907//sampleEnd
908}
909```
910
911</div>
912
913> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-18.kt).
914
915We see that while the first number was being processed the second and the third ones were already produced, so
916the second one was _conflated_ and only the most recent (the third one) was delivered to the collector:
917
918```text
9191
9203
921Collected in 758 ms
922```
923
924<!--- TEST ARBITRARY_TIME -->
925
926#### Processing the latest value
927
928Conflation is one way to speed up processing when both emitter and collector are slow. It does that by dropping emitted values.
929The other way is to cancel slow collector and restart it every time a new value is emitted. There is
930a family of `xxxLatest` operators that perform the same essential logic of `xxx` operator, but cancel the
931code in their block on a new value. Let us change the previous example from [conflate] to [collectLatest]:
932
933<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
934
935```kotlin
936import kotlinx.coroutines.*
937import kotlinx.coroutines.flow.*
938import kotlin.system.*
939
940fun foo(): Flow<Int> = flow {
941 for (i in 1..3) {
942 delay(100) // pretend we are asynchronously waiting 100 ms
943 emit(i) // emit next value
944 }
945}
946
947fun main() = runBlocking<Unit> {
948//sampleStart
949 val time = measureTimeMillis {
950 foo()
951 .collectLatest { value -> // cancel & restart on the latest value
952 println("Collecting $value")
953 delay(300) // pretend we are processing it for 300 ms
954 println("Done $value")
955 }
956 }
957 println("Collected in $time ms")
958//sampleEnd
959}
960```
961
962</div>
963
964> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-19.kt).
965
966Since the body of [collectLatest] takes 300 ms, but new values are emitted every 100 ms, we see that the block
967is run on every value, but completes only for the last value:
968
969```text
970Collecting 1
971Collecting 2
972Collecting 3
973Done 3
974Collected in 741 ms
975```
976
977<!--- TEST ARBITRARY_TIME -->
978
979### Composing multiple flows
980
981There are several ways to compose multiple flows.
982
983#### Zip
984
985Similarly to [Sequence.zip] extension function in the Kotlin standard library,
986flows have [zip] operator that combines the corresponding values of two flows:
987
988<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
989
990```kotlin
991import kotlinx.coroutines.*
992import kotlinx.coroutines.flow.*
993
994fun main() = runBlocking<Unit> {
995//sampleStart
996 val nums = (1..3).asFlow() // numbers 1..3
997 val strs = flowOf("one", "two", "three") // strings
998 nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
999 .collect { println(it) } // collect and print
1000//sampleEnd
1001}
1002```
1003
1004</div>
1005
1006> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-20.kt).
1007
1008This example prints:
1009
1010```text
10111 -> one
10122 -> two
10133 -> three
1014```
1015
1016<!--- TEST -->
1017
1018#### Combine
1019
1020When flow represents the most recent value of some variable or operation (see also a related
1021section on [conflation](#conflation)) it might be needed to perform a computation that depends on
1022the most recent values of the corresponding flows and to recompute it whenever any of upstream
1023flows emit a value. The corresponding family of operators is called [combine].
1024
1025For example, if the numbers in the previous example update every 300ms, but strings update every 400 ms,
1026then zipping them using [zip] operator would still produce the same result,
1027albeit results are going to be printed every 400 ms:
1028
1029> We use [onEach] intermediate operator in this example to delay each element and thus make the code
1030that emits sample flows more declarative and shorter.
1031
1032<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1033
1034```kotlin
1035import kotlinx.coroutines.*
1036import kotlinx.coroutines.flow.*
1037
1038fun main() = runBlocking<Unit> {
1039//sampleStart
1040 val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
1041 val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
1042 val startTime = System.currentTimeMillis() // remember the start time
1043 nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
1044 .collect { value -> // collect and print
1045 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1046 }
1047//sampleEnd
1048}
1049```
1050
1051</div>
1052
1053> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-21.kt).
1054
1055<!--- TEST ARBITRARY_TIME
10561 -> one at 437 ms from start
10572 -> two at 837 ms from start
10583 -> three at 1243 ms from start
1059-->
1060
1061However, using [combine] operator here instead of [zip]:
1062
1063<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1064
1065```kotlin
1066import kotlinx.coroutines.*
1067import kotlinx.coroutines.flow.*
1068
1069fun main() = runBlocking<Unit> {
1070//sampleStart
1071 val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
1072 val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
1073 val startTime = currentTimeMillis() // remember the start time
1074 nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
1075 .collect { value -> // collect and print
1076 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1077 }
1078//sampleEnd
1079}
1080```
1081
1082</div>
1083
1084> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-22.kt).
1085
1086We get quite a different output, where a line is printed at each emission from either `nums` or `strs` flows:
1087
1088```text
10891 -> one at 452 ms from start
10902 -> one at 651 ms from start
10912 -> two at 854 ms from start
10923 -> two at 952 ms from start
10933 -> three at 1256 ms from start
1094```
1095
1096<!--- TEST ARBITRARY_TIME -->
1097
1098### Flattening flows
1099
1100Flows represent asynchronously received sequences of values, so it is quite easy to get in a situation where
1101each value triggers a request for another sequence of values. For example, we can have the following
1102function that returns a flow of two strings 500 ms apart:
1103
1104<div class="sample" markdown="1" theme="idea" data-highlight-only>
1105
1106```kotlin
1107fun requestFlow(i: Int): Flow<String> = flow {
1108 emit("$i: First")
1109 delay(500) // wait 500 ms
1110 emit("$i: Second")
1111}
1112```
1113
1114</div>
1115
1116<!--- CLEAR -->
1117
1118Now if we have a flow of three integers and call `requestFlow` for each of them like this:
1119
1120<div class="sample" markdown="1" theme="idea" data-highlight-only>
1121
1122```kotlin
1123(1..3).asFlow().map { requestFlow(it) }
1124```
1125
1126</div>
1127
1128<!--- CLEAR -->
1129
1130Then we end up with a flow of flows (`Flow<Flow<String>>`) that needs to be _flattened_ into a single flow for
1131further processing. Collections and sequences have [flatten][Sequence.flatten] and [flatMap][Sequence.flatMap]
1132operators for this purpose. However, the asynchronous nature of flows calls for different _modes_ of flattening
1133thus there is a family of flattening operators on flows.
1134
1135#### flatMapConcat
1136
1137Concatenating mode is implemented by [flatMapConcat] and [flattenConcat] operators. They are the most direct
1138analogues of the corresponding sequence operators. They wait for inner flow to complete before
1139starting to collect the next one as the following example shows:
1140
1141<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1142
1143```kotlin
1144import kotlinx.coroutines.*
1145import kotlinx.coroutines.flow.*
1146
1147fun requestFlow(i: Int): Flow<String> = flow {
1148 emit("$i: First")
1149 delay(500) // wait 500 ms
1150 emit("$i: Second")
1151}
1152
1153fun main() = runBlocking<Unit> {
1154//sampleStart
1155 val startTime = currentTimeMillis() // remember the start time
1156 (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
1157 .flatMapConcat { requestFlow(it) }
1158 .collect { value -> // collect and print
1159 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1160 }
1161//sampleEnd
1162}
1163```
1164
1165</div>
1166
1167> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-23.kt).
1168
1169The sequential nature of [flatMapConcat] is clearly seen in the output:
1170
1171```text
11721: First at 121 ms from start
11731: Second at 622 ms from start
11742: First at 727 ms from start
11752: Second at 1227 ms from start
11763: First at 1328 ms from start
11773: Second at 1829 ms from start
1178```
1179
1180<!--- TEST ARBITRARY_TIME -->
1181
1182#### flatMapMerge
1183
1184Another flattening mode is to concurrently collect all the incoming flows and merge their values into
1185a single flow so that values are emitted as soon as possible.
1186It is implemented by [flatMapMerge] and [flattenMerge] operators. They both accept an optional
1187`concurrency` parameter that limits the number of concurrent flows that are collected at the same time
1188(it is equal to [DEFAULT_CONCURRENCY] by default).
1189
1190<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1191
1192```kotlin
1193import kotlinx.coroutines.*
1194import kotlinx.coroutines.flow.*
1195
1196fun requestFlow(i: Int): Flow<String> = flow {
1197 emit("$i: First")
1198 delay(500) // wait 500 ms
1199 emit("$i: Second")
1200}
1201
1202fun main() = runBlocking<Unit> {
1203//sampleStart
1204 val startTime = currentTimeMillis() // remember the start time
1205 (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
1206 .flatMapMerge { requestFlow(it) }
1207 .collect { value -> // collect and print
1208 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1209 }
1210//sampleEnd
1211}
1212```
1213
1214</div>
1215
1216> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-24.kt).
1217
1218The concurrent nature of [flatMapMerge] is obvious:
1219
1220```text
12211: First at 136 ms from start
12222: First at 231 ms from start
12233: First at 333 ms from start
12241: Second at 639 ms from start
12252: Second at 732 ms from start
12263: Second at 833 ms from start
1227```
1228
1229<!--- TEST ARBITRARY_TIME -->
1230
1231> Note that [flatMapMerge] call its block of code (`{ requestFlow(it) }` in this example) sequentially, but
1232collects the resulting flows concurrently, so it is equivalent to performing a sequential
1233`map { requestFlow(it) }` first and then calling [flattenMerge] on the result.
1234
1235#### flatMapLatest
1236
1237In a similar way to [collectLatest] operator that was shown in
1238["Processing the latest value"](#processing-the-latest-value) section, there is the corresponding "Latest"
1239flattening mode where collection of the previous flow is cancelled as soon as new flow is emitted.
1240It is implemented by [flatMapLatest] operator.
1241
1242<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1243
1244```kotlin
1245import kotlinx.coroutines.*
1246import kotlinx.coroutines.flow.*
1247
1248fun requestFlow(i: Int): Flow<String> = flow {
1249 emit("$i: First")
1250 delay(500) // wait 500 ms
1251 emit("$i: Second")
1252}
1253
1254fun main() = runBlocking<Unit> {
1255//sampleStart
1256 val startTime = currentTimeMillis() // remember the start time
1257 (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
1258 .flatMapLatest { requestFlow(it) }
1259 .collect { value -> // collect and print
1260 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1261 }
1262//sampleEnd
1263}
1264```
1265
1266</div>
1267
1268> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-25.kt).
1269
1270The output of this example speaks for the way [flatMapLatest] works:
1271
1272```text
12731: First at 142 ms from start
12742: First at 322 ms from start
12753: First at 425 ms from start
12763: Second at 931 ms from start
1277```
1278
1279<!--- TEST ARBITRARY_TIME -->
1280
1281> Note that [flatMapLatest] cancels all the code in its block (`{ requestFlow(it) }` in this example) on a new value.
1282It makes no difference in this particular example, because the call to `requestFlow` itself is fast, not-suspending,
1283and cannot be cancelled. However, it would show up if we were to use suspending functions like `delay` in there.
1284
1285### Flow exceptions
1286
1287Flow collection can complete with an exception when emitter or any code inside any of the operators throw an exception.
1288There are several ways to handle these exceptions.
1289
1290#### Collector try and catch
1291
1292A collector can use Kotlin's [`try/catch`][exceptions] block to handle exceptions:
1293
1294<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1295
1296```kotlin
1297import kotlinx.coroutines.*
1298import kotlinx.coroutines.flow.*
1299
1300//sampleStart
1301fun foo(): Flow<Int> = flow {
1302 for (i in 1..3) {
1303 println("Emitting $i")
1304 emit(i) // emit next value
1305 }
1306}
1307
1308fun main() = runBlocking<Unit> {
1309 try {
1310 foo().collect { value ->
1311 println(value)
1312 check(value <= 1) { "Collected $value" }
1313 }
1314 } catch (e: Throwable) {
1315 println("Caught $e")
1316 }
1317}
1318//sampleEnd
1319```
1320
1321</div>
1322
1323> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-26.kt).
1324
1325This code successfully catches an exception in [collect] terminal operator and,
1326as you can see, no more values are emitted after that:
1327
1328```text
1329Emitting 1
13301
1331Emitting 2
13322
1333Caught java.lang.IllegalStateException: Collected 2
1334```
1335
1336<!--- TEST -->
1337
1338#### Everything is caught
1339
1340The previous example actually catches any exception happening in emitter or in any intermediate or terminal operators.
1341For example, let us change the code so that emitted values are [mapped][map] to strings,
1342but the corresponding code produces an exception:
1343
1344<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1345
1346```kotlin
1347import kotlinx.coroutines.*
1348import kotlinx.coroutines.flow.*
1349
1350//sampleStart
1351fun foo(): Flow<String> =
1352 flow {
1353 for (i in 1..3) {
1354 println("Emitting $i")
1355 emit(i) // emit next value
1356 }
1357 }
1358 .map { value ->
1359 check(value <= 1) { "Crashed on $value" }
1360 "string $value"
1361 }
1362
1363fun main() = runBlocking<Unit> {
1364 try {
1365 foo().collect { value -> println(value) }
1366 } catch (e: Throwable) {
1367 println("Caught $e")
1368 }
1369}
1370//sampleEnd
1371```
1372
1373</div>
1374
1375> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-27.kt).
1376
1377This exception is still caught and collection is stopped:
1378
1379```text
1380Emitting 1
1381string 1
1382Emitting 2
1383Caught java.lang.IllegalStateException: Crashed on 2
1384```
1385
1386<!--- TEST -->
1387
1388### Exception transparency
1389
1390But how can code of emitter encapsulate its exception handling behavior?
1391
1392Flows must be _transparent to exceptions_ and it is a violation of exception transparency to [emit][FlowCollector.emit] values in the
1393`flow { ... }` builder from inside of `try/catch` block. This guarantees that a collector throwing an exception
1394can always catch it using `try/catch` as in the previous example.
1395
1396The emitter can use [catch] operator that preserves this exception transparency and allows encapsulation
1397of its exception handling. The body of the `catch` operator can analyze an exception
1398and react to it in different ways depending on which exception was caught:
1399
1400* Exceptions can be rethrown using `throw`.
1401* Exceptions can be turned into emission of values using [emit][FlowCollector.emit] from the body of [catch].
1402* Exceptions can be ignored, logged, or processed by some other code.
1403
1404For example, let us emit a text on catching an exception:
1405
1406<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1407
1408```kotlin
1409import kotlinx.coroutines.*
1410import kotlinx.coroutines.flow.*
1411
1412fun foo(): Flow<String> =
1413 flow {
1414 for (i in 1..3) {
1415 println("Emitting $i")
1416 emit(i) // emit next value
1417 }
1418 }
1419 .map { value ->
1420 check(value <= 1) { "Crashed on $value" }
1421 "string $value"
1422 }
1423
1424fun main() = runBlocking<Unit> {
1425//sampleStart
1426 foo()
1427 .catch { e -> emit("Caught $e") } // emit on exception
1428 .collect { value -> println(value) }
1429//sampleEnd
1430}
1431```
1432
1433</div>
1434
1435> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-28.kt).
1436
1437The output of the example is the same, even though we do not have `try/catch` around the code anymore.
1438
1439<!--- TEST
1440Emitting 1
1441string 1
1442Emitting 2
1443Caught java.lang.IllegalStateException: Crashed on 2
1444-->
1445
1446#### Transparent catch
1447
1448The [catch] intermediate operator, honoring exception transparency, catches only upstream exceptions
1449(that is an exception from all the operators above `catch`, but not below it).
1450If the block in `collect { ... }` (placed below `catch`) throws an exception then it escapes:
1451
1452<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1453
1454```kotlin
1455import kotlinx.coroutines.*
1456import kotlinx.coroutines.flow.*
1457
1458//sampleStart
1459fun foo(): Flow<Int> = flow {
1460 for (i in 1..3) {
1461 println("Emitting $i")
1462 emit(i)
1463 }
1464}
1465
1466fun main() = runBlocking<Unit> {
1467 foo()
1468 .catch { e -> println("Caught $e") } // does not catch downstream exceptions
1469 .collect { value ->
1470 check(value <= 1) { "Collected $value" }
1471 println(value)
1472 }
1473}
1474//sampleEnd
1475```
1476
1477</div>
1478
1479> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-29.kt).
1480
1481The "Caught ..." message is not printed despite the `catch` operator:
1482
1483<!--- TEST EXCEPTION
1484Emitting 1
14851
1486Emitting 2
1487Exception in thread "main" java.lang.IllegalStateException: Collected 2
1488 at ...
1489-->
1490
1491#### Catching declaratively
1492
1493We can combine a declarative nature of [catch] operator with a desire to handle all exceptions by moving the body
1494of [collect] operator into [onEach] and putting it before the `catch` operator. Collection of this flow must
1495be triggered by a call to `collect()` without parameters:
1496
1497<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1498
1499```kotlin
1500import kotlinx.coroutines.*
1501import kotlinx.coroutines.flow.*
1502
1503fun foo(): Flow<Int> = flow {
1504 for (i in 1..3) {
1505 println("Emitting $i")
1506 emit(i)
1507 }
1508}
1509
1510fun main() = runBlocking<Unit> {
1511//sampleStart
1512 foo()
1513 .onEach { value ->
1514 check(value <= 1) { "Collected $value" }
1515 println(value)
1516 }
1517 .catch { e -> println("Caught $e") }
1518 .collect()
1519//sampleEnd
1520}
1521```
1522
1523</div>
1524
1525> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-30.kt).
1526
1527Now we can see that "Caught ..." message is printed and thus we can catch all exceptions without explicitly
1528using a `try/catch` block:
1529
1530<!--- TEST EXCEPTION
1531Emitting 1
15321
1533Emitting 2
1534Caught java.lang.IllegalStateException: Collected 2
1535-->
1536
1537### Flow completion
1538
1539When flow collection completes (normally or exceptionally) it may be needed to execute some action.
1540As you might have already noticed, it also can be done in two ways: imperative and declarative.
1541
1542#### Imperative finally block
1543
1544In addition to `try`/`catch`, a collector can also use `finally` block to execute an action
1545upon `collect` completion.
1546
1547<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1548
1549```kotlin
1550import kotlinx.coroutines.*
1551import kotlinx.coroutines.flow.*
1552
1553//sampleStart
1554fun foo(): Flow<Int> = (1..3).asFlow()
1555
1556fun main() = runBlocking<Unit> {
1557 try {
1558 foo().collect { value -> println(value) }
1559 } finally {
1560 println("Done")
1561 }
1562}
1563//sampleEnd
1564```
1565
1566</div>
1567
1568> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-31.kt).
1569
1570This code prints three numbers produced by the `foo()` flow followed by "Done" string:
1571
1572```text
15731
15742
15753
1576Done
1577```
1578
1579<!--- TEST -->
1580
1581#### Declarative handling
1582
1583For declarative approach, flow has [onCompletion] intermediate operator that is invoked
1584when the flow is completely collected.
1585
1586The previous example can be rewritten using [onCompletion] operator and produces the same output:
1587
1588<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1589
1590```kotlin
1591import kotlinx.coroutines.*
1592import kotlinx.coroutines.flow.*
1593
1594fun foo(): Flow<Int> = (1..3).asFlow()
1595
1596fun main() = runBlocking<Unit> {
1597//sampleStart
1598 foo()
1599 .onCompletion { println("Done") }
1600 .collect { value -> println(value) }
1601//sampleEnd
1602}
1603```
1604</div>
1605
1606> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-32.kt).
1607
1608<!--- TEST
16091
16102
16113
1612Done
1613-->
1614
1615The key advantage of [onCompletion] is a nullable `Throwable` parameter of the lambda that can be used
1616to determine whether flow collection was completed normally or exceptionally. In the following
1617example `foo()` flow throws exception after emitting number 1:
1618
1619<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1620
1621```kotlin
1622import kotlinx.coroutines.*
1623import kotlinx.coroutines.flow.*
1624
1625//sampleStart
1626fun foo(): Flow<Int> = flow {
1627 emit(1)
1628 throw RuntimeException()
1629}
1630
1631fun main() = runBlocking<Unit> {
1632 foo()
1633 .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
1634 .catch { cause -> println("Caught exception") }
1635 .collect { value -> println(value) }
1636}
1637//sampleEnd
1638```
1639</div>
1640
1641> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-33.kt).
1642
1643As you may expect, it prints:
1644
1645```text
16461
1647Flow completed exceptionally
1648Caught exception
1649```
1650
1651<!--- TEST -->
1652
1653[onCompletion] operator, unlike [catch], does not handle the exception. As we can see from the above
1654example code, the exception still flows downstream. It will be delivered to further `onCompletion` operators
1655and can be handled with `catch` operator.
1656
1657#### Upstream exceptions only
1658
1659Just like [catch] operator, [onCompletion] sees only exception coming from upstream and does not
1660see downstream exceptions. For example, run the following code:
1661
1662<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1663
1664```kotlin
1665import kotlinx.coroutines.*
1666import kotlinx.coroutines.flow.*
1667
1668//sampleStart
1669fun foo(): Flow<Int> = (1..3).asFlow()
1670
1671fun main() = runBlocking<Unit> {
1672 foo()
1673 .onCompletion { cause -> println("Flow completed with $cause") }
1674 .collect { value ->
1675 check(value <= 1) { "Collected $value" }
1676 println(value)
1677 }
1678}
1679//sampleEnd
1680```
1681
1682</div>
1683
1684> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-34.kt).
1685
1686And you can see the completion cause is null, yet collection failed with exception:
1687
1688```text
16891
1690Flow completed with null
1691Exception in thread "main" java.lang.IllegalStateException: Collected 2
1692```
1693
1694<!--- TEST EXCEPTION -->
1695
1696### Imperative versus declarative
1697
1698Now we know how to collect flow, handle its completion and exceptions in both imperative and declarative ways.
1699The natural question here is which approach should be preferred and why.
1700As a library, we do not advocate for any particular approach and believe that both options
1701are valid and should be selected according to your own preferences and code style.
1702
1703### Launching flow
1704
1705It is convenient to use flows to represent asynchronous events that are coming from some source.
1706In this case, we need an analogue of `addEventListener` function that registers a piece of code with a reaction
1707on incoming events and continues further work. The [onEach] operator can serve this role.
1708However, `onEach` is an intermediate operator. We also need a terminal operator to collect the flow.
1709Otherwise, just calling `onEach` has no effect.
1710
1711If we use [collect] terminal operator after `onEach`, then code after it waits until the flow is collected:
1712
1713<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1714
1715```kotlin
1716import kotlinx.coroutines.*
1717import kotlinx.coroutines.flow.*
1718
1719//sampleStart
1720// Imitate a flow of events
1721fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
1722
1723fun main() = runBlocking<Unit> {
1724 events()
1725 .onEach { event -> println("Event: $event") }
1726 .collect() // <--- Collecting the flow waits
1727 println("Done")
1728}
1729//sampleEnd
1730```
1731
1732</div>
1733
1734> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-35.kt).
1735
1736As you can see, it prints:
1737
1738```text
1739Event: 1
1740Event: 2
1741Event: 3
1742Done
1743```
1744
1745<!--- TEST -->
1746
1747Here [launchIn] terminal operator comes in handy. Replacing `collect` with `launchIn` we can
1748launch collection of the flow in a separate coroutine, so that execution of further code
1749immediately continues:
1750
1751<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1752
1753```kotlin
1754import kotlinx.coroutines.*
1755import kotlinx.coroutines.flow.*
1756
1757// Imitate a flow of events
1758fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
1759
1760//sampleStart
1761fun main() = runBlocking<Unit> {
1762 events()
1763 .onEach { event -> println("Event: $event") }
1764 .launchIn(this) // <--- Launching the flow in a separate coroutine
1765 println("Done")
1766}
1767//sampleEnd
1768```
1769
1770</div>
1771
1772> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-36.kt).
1773
1774It prints:
1775
1776```text
1777Done
1778Event: 1
1779Event: 2
1780Event: 3
1781```
1782
1783<!--- TEST -->
1784
1785The required parameter to `launchIn` must specify a [CoroutineScope] in which the coroutine to collect the flow is
1786launched. In the above example this scope comes from [runBlocking]
1787coroutine builder, so while the flow is running this [runBlocking] scope waits for completion of its child coroutine
1788and keeps the main function from returning and terminating this example.
1789
1790In real applications a scope is going to come from some entity with a limited
1791lifetime. As soon as the lifetime of this entity is terminated the corresponding scope is cancelled, cancelling
1792collection of the corresponding flow. This way the pair of `onEach { ... }.launchIn(scope)` works
1793like `addEventListener`. However, there is no need for the corresponding `removeEventListener` function,
1794as cancellation and structured concurrency serve this purpose.
1795
1796Note, that [launchIn] also returns a [Job] which can be used to [cancel][Job.cancel] the corresponding flow collection
1797coroutine only without cancelling the whole scope or to [join][Job.join] it.
1798
1799<!-- stdlib references -->
1800
1801[collections]: https://kotlinlang.org/docs/reference/collections-overview.html
1802[List]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/-list/index.html
1803[forEach]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/for-each.html
1804[Sequence]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/index.html
1805[Sequence.zip]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/zip.html
1806[Sequence.flatten]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/flatten.html
1807[Sequence.flatMap]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/flat-map.html
1808[exceptions]: https://kotlinlang.org/docs/reference/exceptions.html
1809
1810<!--- MODULE kotlinx-coroutines-core -->
1811<!--- INDEX kotlinx.coroutines -->
1812[delay]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/delay.html
1813[withTimeoutOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-timeout-or-null.html
1814[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
1815[Dispatchers.Main]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-main.html
1816[withContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-context.html
1817[CoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-dispatcher/index.html
1818[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html
1819[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
1820[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/index.html
1821[Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/cancel.html
1822[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html
1823<!--- INDEX kotlinx.coroutines.flow -->
1824[Flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html
1825[flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow.html
1826[FlowCollector.emit]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow-collector/emit.html
1827[collect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect.html
1828[flowOf]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-of.html
1829[map]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/map.html
1830[filter]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/filter.html
1831[transform]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/transform.html
1832[take]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/take.html
1833[toList]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-list.html
1834[toSet]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-set.html
1835[first]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/first.html
1836[single]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/single.html
1837[reduce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/reduce.html
1838[fold]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/fold.html
1839[flowOn]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-on.html
1840[buffer]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/buffer.html
1841[conflate]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/conflate.html
1842[collectLatest]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect-latest.html
1843[zip]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/zip.html
1844[combine]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/combine.html
1845[onEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-each.html
1846[flatMapConcat]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-concat.html
1847[flattenConcat]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-concat.html
1848[flatMapMerge]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-merge.html
1849[flattenMerge]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-merge.html
1850[DEFAULT_CONCURRENCY]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-d-e-f-a-u-l-t_-c-o-n-c-u-r-r-e-n-c-y.html
1851[flatMapLatest]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-latest.html
1852[catch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/catch.html
1853[onCompletion]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-completion.html
1854[launchIn]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/launch-in.html
1855<!--- END -->