blob: 7837a54058bf667ff997f6aa60c55d464edcaa00 [file] [log] [blame] [view]
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001<!--- INCLUDE .*/example-([a-z]+)-([0-9a-z]+)\.kt
2/*
3 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
4 */
5
6// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
7package kotlinx.coroutines.guide.$$1$$2
8-->
9<!--- KNIT ../kotlinx-coroutines-core/jvm/test/guide/.*-##\.kt -->
10<!--- TEST_OUT ../kotlinx-coroutines-core/jvm/test/guide/test/FlowGuideTest.kt
11// This file was automatically generated from flow.md by Knit tool. Do not edit.
12package kotlinx.coroutines.guide.test
13
14import org.junit.Test
15
16class FlowGuideTest {
17-->
18
19**Table of contents**
20
21<!--- TOC -->
22
23* [Asynchronous Flow](#asynchronous-flow)
24 * [Representing multiple values](#representing-multiple-values)
25 * [Sequences](#sequences)
26 * [Suspending functions](#suspending-functions)
27 * [Flows](#flows)
28 * [Flows are cold](#flows-are-cold)
29 * [Flow cancellation](#flow-cancellation)
30 * [Flow builders](#flow-builders)
31 * [Intermediate flow operators](#intermediate-flow-operators)
32 * [Transform operator](#transform-operator)
33 * [Size-limiting operators](#size-limiting-operators)
34 * [Terminal flow operators](#terminal-flow-operators)
35 * [Flows are sequential](#flows-are-sequential)
36 * [Flow context](#flow-context)
37 * [Wrong emission withContext](#wrong-emission-withcontext)
38 * [flowOn operator](#flowon-operator)
39 * [Buffering](#buffering)
40 * [Conflation](#conflation)
41 * [Processing the latest value](#processing-the-latest-value)
42 * [Composing multiple flows](#composing-multiple-flows)
43 * [Zip](#zip)
44 * [Combine](#combine)
45 * [Flattening flows](#flattening-flows)
46 * [flatMapConcat](#flatmapconcat)
47 * [flatMapMerge](#flatmapmerge)
48 * [flatMapLatest](#flatmaplatest)
49 * [Flow exceptions](#flow-exceptions)
50 * [Collector try and catch](#collector-try-and-catch)
51 * [Everything is caught](#everything-is-caught)
52 * [Exception transparency](#exception-transparency)
53 * [Transparent catch](#transparent-catch)
54 * [Catching declaratively](#catching-declaratively)
55 * [Flow completion](#flow-completion)
56 * [Imperative finally block](#imperative-finally-block)
57 * [Declarative handling](#declarative-handling)
58 * [Upstream exceptions only](#upstream-exceptions-only)
59 * [Imperative versus declarative](#imperative-versus-declarative)
60 * [Launching flow](#launching-flow)
Vsevolod Tolstopyatovc99704a2019-09-24 19:30:49 +030061 * [Flow and Reactive Streams](#flow-and-reactive-streams)
Roman Elizarov3258e1f2019-08-22 20:08:48 +030062
63<!--- END_TOC -->
64
65## Asynchronous Flow
66
David.Watsonbb714c52019-08-30 17:49:42 +020067Suspending functions asynchronously returns a single value, but how can we return
68multiple asynchronously computed values? This is where Kotlin Flows come in.
Roman Elizarov3258e1f2019-08-22 20:08:48 +030069
70### Representing multiple values
71
72Multiple values can be represented in Kotlin using [collections].
73For example, we can have a function `foo()` that returns a [List]
David.Watsonbb714c52019-08-30 17:49:42 +020074of three numbers and then print them all using [forEach]:
Roman Elizarov3258e1f2019-08-22 20:08:48 +030075
76<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
77
78```kotlin
79fun foo(): List<Int> = listOf(1, 2, 3)
80
81fun main() {
82 foo().forEach { value -> println(value) }
83}
84```
85
86</div>
87
David.Watsonbb714c52019-08-30 17:49:42 +020088> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-01.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +030089
90This code outputs:
91
92```text
931
942
953
96```
97
98<!--- TEST -->
99
100#### Sequences
101
David.Watsonbb714c52019-08-30 17:49:42 +0200102If we are computing the numbers with some CPU-consuming blocking code
103(each computation taking 100ms), then we can represent the numbers using a [Sequence]:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300104
105<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
106
107```kotlin
108fun foo(): Sequence<Int> = sequence { // sequence builder
109 for (i in 1..3) {
110 Thread.sleep(100) // pretend we are computing it
111 yield(i) // yield next value
112 }
113}
114
115fun main() {
116 foo().forEach { value -> println(value) }
117}
118```
119
120</div>
121
David.Watsonbb714c52019-08-30 17:49:42 +0200122> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-02.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300123
124This code outputs the same numbers, but it waits 100ms before printing each one.
125
126<!--- TEST
1271
1282
1293
130-->
131
132#### Suspending functions
133
134However, this computation blocks the main thread that is running the code.
David.Watsonbb714c52019-08-30 17:49:42 +0200135When these values are computed by asynchronous code we can mark the function `foo` with a `suspend` modifier,
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300136so that it can perform its work without blocking and return the result as a list:
137
138<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
139
140```kotlin
141import kotlinx.coroutines.*
142
143//sampleStart
144suspend fun foo(): List<Int> {
145 delay(1000) // pretend we are doing something asynchronous here
146 return listOf(1, 2, 3)
147}
148
149fun main() = runBlocking<Unit> {
150 foo().forEach { value -> println(value) }
151}
152//sampleEnd
153```
154
155</div>
156
David.Watsonbb714c52019-08-30 17:49:42 +0200157> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-03.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300158
159This code prints the numbers after waiting for a second.
160
161<!--- TEST
1621
1632
1643
165-->
166
167#### Flows
168
David.Watsonbb714c52019-08-30 17:49:42 +0200169Using the `List<Int>` result type, means we can only return all the values at once. To represent
170the stream of values that are being asynchronously computed, we can use a [`Flow<Int>`][Flow] type just like we would the `Sequence<Int>` type for synchronously computed values:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300171
172<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
173
174```kotlin
175import kotlinx.coroutines.*
176import kotlinx.coroutines.flow.*
177
178//sampleStart
179fun foo(): Flow<Int> = flow { // flow builder
180 for (i in 1..3) {
181 delay(100) // pretend we are doing something useful here
182 emit(i) // emit next value
183 }
184}
185
186fun main() = runBlocking<Unit> {
David.Watsonbb714c52019-08-30 17:49:42 +0200187 // Launch a concurrent coroutine to check if the main thread is blocked
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300188 launch {
189 for (k in 1..3) {
190 println("I'm not blocked $k")
191 delay(100)
192 }
193 }
194 // Collect the flow
195 foo().collect { value -> println(value) }
196}
197//sampleEnd
198```
199
200</div>
201
David.Watsonbb714c52019-08-30 17:49:42 +0200202> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-04.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300203
204This code waits 100ms before printing each number without blocking the main thread. This is verified
205by printing "I'm not blocked" every 100ms from a separate coroutine that is running in the main thread:
206
207```text
208I'm not blocked 1
2091
210I'm not blocked 2
2112
212I'm not blocked 3
2133
214```
215
216<!--- TEST -->
217
David.Watsonbb714c52019-08-30 17:49:42 +0200218Notice the following differences in the code with the [Flow] from the earlier examples:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300219
220* A builder function for [Flow] type is called [flow].
221* Code inside the `flow { ... }` builder block can suspend.
222* The function `foo()` is no longer marked with `suspend` modifier.
223* Values are _emitted_ from the flow using [emit][FlowCollector.emit] function.
224* Values are _collected_ from the flow using [collect][collect] function.
225
David.Watsonbb714c52019-08-30 17:49:42 +0200226> We can replace [delay] with `Thread.sleep` in the body of `foo`'s `flow { ... }` and see that the main
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300227thread is blocked in this case.
228
229### Flows are cold
230
David.Watsonbb714c52019-08-30 17:49:42 +0200231Flows are _cold_ streams similar to sequences &mdash; the code inside a [flow] builder does not
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300232run until the flow is collected. This becomes clear in the following example:
233
234<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
235
236```kotlin
237import kotlinx.coroutines.*
238import kotlinx.coroutines.flow.*
239
240//sampleStart
241fun foo(): Flow<Int> = flow {
242 println("Flow started")
243 for (i in 1..3) {
244 delay(100)
245 emit(i)
246 }
247}
248
249fun main() = runBlocking<Unit> {
250 println("Calling foo...")
251 val flow = foo()
252 println("Calling collect...")
253 flow.collect { value -> println(value) }
254 println("Calling collect again...")
255 flow.collect { value -> println(value) }
256}
257//sampleEnd
258```
259
260</div>
261
David.Watsonbb714c52019-08-30 17:49:42 +0200262> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-05.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300263
264Which prints:
265
266```text
267Calling foo...
268Calling collect...
269Flow started
2701
2712
2723
273Calling collect again...
274Flow started
2751
2762
2773
278```
279
280<!--- TEST -->
281
David.Watsonbb714c52019-08-30 17:49:42 +0200282This is a key reason the `foo()` function (which returns a flow) is not marked with `suspend` modifier.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300283By itself, `foo()` returns quickly and does not wait for anything. The flow starts every time it is collected,
David.Watsonbb714c52019-08-30 17:49:42 +0200284that is why we see "Flow started" when we call `collect` again.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300285
286### Flow cancellation
287
David.Watsonbb714c52019-08-30 17:49:42 +0200288Flow adheres to the general cooperative cancellation of coroutines. However, flow infrastructure does not introduce
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300289additional cancellation points. It is fully transparent for cancellation. As usual, flow collection can be
David.Watsonbb714c52019-08-30 17:49:42 +0200290cancelled when the flow is suspended in a cancellable suspending function (like [delay]), and cannot be cancelled otherwise.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300291
David.Watsonbb714c52019-08-30 17:49:42 +0200292The following example shows how the flow gets cancelled on a timeout when running in a [withTimeoutOrNull] block
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300293and stops executing its code:
294
295<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
296
297```kotlin
298import kotlinx.coroutines.*
299import kotlinx.coroutines.flow.*
300
301//sampleStart
302fun foo(): Flow<Int> = flow {
303 for (i in 1..3) {
304 delay(100)
305 println("Emitting $i")
306 emit(i)
307 }
308}
309
310fun main() = runBlocking<Unit> {
311 withTimeoutOrNull(250) { // Timeout after 250ms
312 foo().collect { value -> println(value) }
313 }
314 println("Done")
315}
316//sampleEnd
317```
318
319</div>
320
David.Watsonbb714c52019-08-30 17:49:42 +0200321> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-06.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300322
323Notice how only two numbers get emitted by the flow in `foo()` function, producing the following output:
324
325```text
326Emitting 1
3271
328Emitting 2
3292
330Done
331```
332
333<!--- TEST -->
334
335### Flow builders
336
337The `flow { ... }` builder from the previous examples is the most basic one. There are other builders for
David.Watsonbb714c52019-08-30 17:49:42 +0200338easier declaration of flows:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300339
340* [flowOf] builder that defines a flow emitting a fixed set of values.
341* Various collections and sequences can be converted to flows using `.asFlow()` extension functions.
342
David.Watsonbb714c52019-08-30 17:49:42 +0200343So, the example that prints the numbers from 1 to 3 from a flow can be written as:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300344
345<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
346
347```kotlin
348import kotlinx.coroutines.*
349import kotlinx.coroutines.flow.*
350
351fun main() = runBlocking<Unit> {
352//sampleStart
353 // Convert an integer range to a flow
354 (1..3).asFlow().collect { value -> println(value) }
355//sampleEnd
356}
357```
358
359</div>
360
David.Watsonbb714c52019-08-30 17:49:42 +0200361> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-07.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300362
363<!--- TEST
3641
3652
3663
367-->
368
369### Intermediate flow operators
370
David.Watsonbb714c52019-08-30 17:49:42 +0200371Flows can be transformed with operators, just as you would with collections and sequences.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300372Intermediate operators are applied to an upstream flow and return a downstream flow.
373These operators are cold, just like flows are. A call to such an operator is not
374a suspending function itself. It works quickly, returning the definition of a new transformed flow.
375
376The basic operators have familiar names like [map] and [filter].
David.Watsonbb714c52019-08-30 17:49:42 +0200377The important difference to sequences is that blocks of
378code inside these operators can call suspending functions.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300379
380For example, a flow of incoming requests can be
David.Watsonbb714c52019-08-30 17:49:42 +0200381mapped to the results with the [map] operator, even when performing a request is a long-running
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300382operation that is implemented by a suspending function:
383
384<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
385
386```kotlin
387import kotlinx.coroutines.*
388import kotlinx.coroutines.flow.*
389
390//sampleStart
391suspend fun performRequest(request: Int): String {
392 delay(1000) // imitate long-running asynchronous work
393 return "response $request"
394}
395
396fun main() = runBlocking<Unit> {
397 (1..3).asFlow() // a flow of requests
398 .map { request -> performRequest(request) }
399 .collect { response -> println(response) }
400}
401//sampleEnd
402```
403
404</div>
405
David.Watsonbb714c52019-08-30 17:49:42 +0200406> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-08.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300407
David.Watsonbb714c52019-08-30 17:49:42 +0200408It produces the following three lines, each line appearing after each second:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300409
410```text
411response 1
412response 2
413response 3
414```
415
416<!--- TEST -->
417
418#### Transform operator
419
420Among the flow transformation operators, the most general one is called [transform]. It can be used to imitate
David.Watsonbb714c52019-08-30 17:49:42 +0200421simple transformations like [map] and [filter], as well as implement more complex transformations.
422Using the `transform` operator, we can [emit][FlowCollector.emit] arbitrary values an arbitrary number of times.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300423
424For example, using `transform` we can emit a string before performing a long-running asynchronous request
425and follow it with a response:
426
427<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
428
429```kotlin
430import kotlinx.coroutines.*
431import kotlinx.coroutines.flow.*
432
433suspend fun performRequest(request: Int): String {
434 delay(1000) // imitate long-running asynchronous work
435 return "response $request"
436}
437
438fun main() = runBlocking<Unit> {
439//sampleStart
440 (1..3).asFlow() // a flow of requests
441 .transform { request ->
442 emit("Making request $request")
443 emit(performRequest(request))
444 }
445 .collect { response -> println(response) }
446//sampleEnd
447}
448```
449
450</div>
451
David.Watsonbb714c52019-08-30 17:49:42 +0200452> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-09.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300453
454The output of this code is:
455
456```text
457Making request 1
458response 1
459Making request 2
460response 2
461Making request 3
462response 3
463```
464
465<!--- TEST -->
466
467#### Size-limiting operators
468
469Size-limiting intermediate operators like [take] cancel the execution of the flow when the corresponding limit
David.Watsonbb714c52019-08-30 17:49:42 +0200470is reached. Cancellation in coroutines is always performed by throwing an exception, so that all the resource-management
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300471functions (like `try { ... } finally { ... }` blocks) operate normally in case of cancellation:
472
473<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
474
475```kotlin
476import kotlinx.coroutines.*
477import kotlinx.coroutines.flow.*
478
479//sampleStart
480fun numbers(): Flow<Int> = flow {
481 try {
482 emit(1)
483 emit(2)
484 println("This line will not execute")
485 emit(3)
486 } finally {
487 println("Finally in numbers")
488 }
489}
490
491fun main() = runBlocking<Unit> {
492 numbers()
493 .take(2) // take only the first two
494 .collect { value -> println(value) }
495}
496//sampleEnd
497```
498
499</div>
500
David.Watsonbb714c52019-08-30 17:49:42 +0200501> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-10.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300502
David.Watsonbb714c52019-08-30 17:49:42 +0200503The output of this code clearly shows that the execution of the `flow { ... }` body in the `numbers()` function
504stopped after emitting the second number:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300505
506```text
5071
5082
509Finally in numbers
510```
511
512<!--- TEST -->
513
514### Terminal flow operators
515
516Terminal operators on flows are _suspending functions_ that start a collection of the flow.
David.Watsonbb714c52019-08-30 17:49:42 +0200517The [collect] operator is the most basic one, but there are other terminal operators, which can make it easier:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300518
519* Conversion to various collections like [toList] and [toSet].
520* Operators to get the [first] value and to ensure that a flow emits a [single] value.
521* Reducing a flow to a value with [reduce] and [fold].
522
523For example:
524
525<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
526
527```kotlin
528import kotlinx.coroutines.*
529import kotlinx.coroutines.flow.*
530
531fun main() = runBlocking<Unit> {
532//sampleStart
533 val sum = (1..5).asFlow()
534 .map { it * it } // squares of numbers from 1 to 5
535 .reduce { a, b -> a + b } // sum them (terminal operator)
536 println(sum)
537//sampleEnd
538}
539```
540
541</div>
542
David.Watsonbb714c52019-08-30 17:49:42 +0200543> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-11.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300544
545Prints a single number:
546
547```text
54855
549```
550
551<!--- TEST -->
552
553### Flows are sequential
554
555Each individual collection of a flow is performed sequentially unless special operators that operate
556on multiple flows are used. The collection works directly in the coroutine that calls a terminal operator.
557No new coroutines are launched by default.
David.Watsonbb714c52019-08-30 17:49:42 +0200558Each emitted value is processed by all the intermediate operators from
559upstream to downstream and is then delivered to the terminal operator after.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300560
David.Watsonbb714c52019-08-30 17:49:42 +0200561See the following example that filters the even integers and maps them to strings:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300562
563<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
564
565```kotlin
566import kotlinx.coroutines.*
567import kotlinx.coroutines.flow.*
568
569fun main() = runBlocking<Unit> {
570//sampleStart
571 (1..5).asFlow()
572 .filter {
573 println("Filter $it")
574 it % 2 == 0
575 }
576 .map {
577 println("Map $it")
578 "string $it"
579 }.collect {
580 println("Collect $it")
581 }
582//sampleEnd
583}
584```
585
586</div>
587
David.Watsonbb714c52019-08-30 17:49:42 +0200588> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-12.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300589
590Producing:
591
592```text
593Filter 1
594Filter 2
595Map 2
596Collect string 2
597Filter 3
598Filter 4
599Map 4
600Collect string 4
601Filter 5
602```
603
604<!--- TEST -->
605
606### Flow context
607
608Collection of a flow always happens in the context of the calling coroutine. For example, if there is
609a `foo` flow, then the following code runs in the context specified
David.Watsonbb714c52019-08-30 17:49:42 +0200610by the author of this code, regardless of the implementation details of the `foo` flow:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300611
612<div class="sample" markdown="1" theme="idea" data-highlight-only>
613
614```kotlin
615withContext(context) {
616 foo.collect { value ->
617 println(value) // run in the specified context
618 }
619}
620```
621
622</div>
623
624<!--- CLEAR -->
625
626This property of a flow is called _context preservation_.
627
628So, by default, code in the `flow { ... }` builder runs in the context that is provided by a collector
629of the corresponding flow. For example, consider the implementation of `foo` that prints the thread
630it is called on and emits three numbers:
631
632<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
633
634```kotlin
635import kotlinx.coroutines.*
636import kotlinx.coroutines.flow.*
637
638fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
639
640//sampleStart
641fun foo(): Flow<Int> = flow {
642 log("Started foo flow")
643 for (i in 1..3) {
644 emit(i)
645 }
646}
647
648fun main() = runBlocking<Unit> {
649 foo().collect { value -> log("Collected $value") }
650}
651//sampleEnd
652```
653
654</div>
655
David.Watsonbb714c52019-08-30 17:49:42 +0200656> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-13.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300657
658Running this code produces:
659
660```text
661[main @coroutine#1] Started foo flow
662[main @coroutine#1] Collected 1
663[main @coroutine#1] Collected 2
664[main @coroutine#1] Collected 3
665```
666
667<!--- TEST FLEXIBLE_THREAD -->
668
669Since `foo().collect` is called from the main thread, the body of `foo`'s flow is also called in the main thread.
David.Watsonbb714c52019-08-30 17:49:42 +0200670This is the perfect default for fast-running or asynchronous code that does not care about the execution context and
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300671does not block the caller.
672
673#### Wrong emission withContext
674
675However, the long-running CPU-consuming code might need to be executed in the context of [Dispatchers.Default] and UI-updating
676code might need to be executed in the context of [Dispatchers.Main]. Usually, [withContext] is used
David.Watsonbb714c52019-08-30 17:49:42 +0200677to change the context in the code using Kotlin coroutines, but code in the `flow { ... }` builder has to honor the context
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300678preservation property and is not allowed to [emit][FlowCollector.emit] from a different context.
679
680Try running the following code:
681
682<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
683
684```kotlin
685import kotlinx.coroutines.*
686import kotlinx.coroutines.flow.*
687
688//sampleStart
689fun foo(): Flow<Int> = flow {
David.Watsonbb714c52019-08-30 17:49:42 +0200690 // The WRONG way to change context for CPU-consuming code in flow builder
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300691 kotlinx.coroutines.withContext(Dispatchers.Default) {
692 for (i in 1..3) {
693 Thread.sleep(100) // pretend we are computing it in CPU-consuming way
694 emit(i) // emit next value
695 }
696 }
697}
698
699fun main() = runBlocking<Unit> {
700 foo().collect { value -> println(value) }
701}
702//sampleEnd
703```
704
705</div>
706
David.Watsonbb714c52019-08-30 17:49:42 +0200707> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-14.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300708
709This code produces the following exception:
710
711<!--- TEST EXCEPTION
712Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
713 Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
714 but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, DefaultDispatcher].
715 Please refer to 'flow' documentation or use 'flowOn' instead
716 at ...
717-->
718
David.Watsonbb714c52019-08-30 17:49:42 +0200719> Note that we had to use a fully qualified name of the [kotlinx.coroutines.withContext][withContext] function in this example to
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300720demonstrate this exception. A short name of `withContext` would have resolved to a special stub function that
David.Watsonbb714c52019-08-30 17:49:42 +0200721produces a compilation error to prevent us from running into this problem.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300722
723#### flowOn operator
724
David.Watsonbb714c52019-08-30 17:49:42 +0200725The exception refers to the [flowOn] function that shall be used to change the context of the flow emission.
726The correct way to change the context of a flow is shown in the example below, which also prints the
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300727names of the corresponding threads to show how it all works:
728
729<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
730
731```kotlin
732import kotlinx.coroutines.*
733import kotlinx.coroutines.flow.*
734
735fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
736
737//sampleStart
738fun foo(): Flow<Int> = flow {
739 for (i in 1..3) {
740 Thread.sleep(100) // pretend we are computing it in CPU-consuming way
741 log("Emitting $i")
742 emit(i) // emit next value
743 }
744}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
745
746fun main() = runBlocking<Unit> {
747 foo().collect { value ->
748 log("Collected $value")
749 }
750}
751//sampleEnd
752```
753
754</div>
755
David.Watsonbb714c52019-08-30 17:49:42 +0200756> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-15.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300757
758Notice how `flow { ... }` works in the background thread, while collection happens in the main thread:
759
760<!--- TEST FLEXIBLE_THREAD
761[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
762[main @coroutine#1] Collected 1
763[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
764[main @coroutine#1] Collected 2
765[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
766[main @coroutine#1] Collected 3
767-->
768
David.Watsonbb714c52019-08-30 17:49:42 +0200769Another thing to observe here is that the [flowOn] operator has changed the default sequential nature of the flow.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300770Now collection happens in one coroutine ("coroutine#1") and emission happens in another coroutine
David.Watsonbb714c52019-08-30 17:49:42 +0200771("coroutine#2") that is running in another thread concurrently with the collecting coroutine. The [flowOn] operator
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300772creates another coroutine for an upstream flow when it has to change the [CoroutineDispatcher] in its context.
773
774### Buffering
775
David.Watsonbb714c52019-08-30 17:49:42 +0200776Running different parts of a flow in different coroutines can be helpful from the standpoint of the overall time it takes
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300777to collect the flow, especially when long-running asynchronous operations are involved. For example, consider a case when
David.Watsonbb714c52019-08-30 17:49:42 +0200778the emission by `foo()` flow is slow, taking 100 ms to produce an element; and collector is also slow,
779taking 300 ms to process an element. Let's see how long it takes to collect such a flow with three numbers:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300780
781<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
782
783```kotlin
784import kotlinx.coroutines.*
785import kotlinx.coroutines.flow.*
786import kotlin.system.*
787
788//sampleStart
789fun foo(): Flow<Int> = flow {
790 for (i in 1..3) {
791 delay(100) // pretend we are asynchronously waiting 100 ms
792 emit(i) // emit next value
793 }
794}
795
796fun main() = runBlocking<Unit> {
797 val time = measureTimeMillis {
798 foo().collect { value ->
799 delay(300) // pretend we are processing it for 300 ms
800 println(value)
801 }
802 }
803 println("Collected in $time ms")
804}
805//sampleEnd
806```
807
808</div>
809
David.Watsonbb714c52019-08-30 17:49:42 +0200810> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-16.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300811
David.Watsonbb714c52019-08-30 17:49:42 +0200812It produces something like this, with the whole collection taking around 1200 ms (three numbers, 400 ms for each):
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300813
814```text
8151
8162
8173
818Collected in 1220 ms
819```
820
821<!--- TEST ARBITRARY_TIME -->
822
David.Watsonbb714c52019-08-30 17:49:42 +0200823We can use a [buffer] operator on a flow to run emitting code of `foo()` concurrently with collecting code,
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300824as opposed to running them sequentially:
825
826<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
827
828```kotlin
829import kotlinx.coroutines.*
830import kotlinx.coroutines.flow.*
831import kotlin.system.*
832
833fun foo(): Flow<Int> = flow {
834 for (i in 1..3) {
835 delay(100) // pretend we are asynchronously waiting 100 ms
836 emit(i) // emit next value
837 }
838}
839
840fun main() = runBlocking<Unit> {
841//sampleStart
842 val time = measureTimeMillis {
843 foo()
844 .buffer() // buffer emissions, don't wait
845 .collect { value ->
846 delay(300) // pretend we are processing it for 300 ms
847 println(value)
848 }
849 }
850 println("Collected in $time ms")
851//sampleEnd
852}
853```
854
855</div>
856
David.Watsonbb714c52019-08-30 17:49:42 +0200857> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-17.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300858
David.Watsonbb714c52019-08-30 17:49:42 +0200859It produces the same numbers just faster, as we have effectively created a processing pipeline,
860having to only wait 100 ms for the first number and then spending only 300 ms to process
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300861each number. This way it takes around 1000 ms to run:
862
863```text
8641
8652
8663
867Collected in 1071 ms
868```
869
870<!--- TEST ARBITRARY_TIME -->
871
David.Watsonbb714c52019-08-30 17:49:42 +0200872> Note that the [flowOn] operator uses the same buffering mechanism when it has to change a [CoroutineDispatcher],
873but here we explicitly request buffering without changing the execution context.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300874
875#### Conflation
876
David.Watsonbb714c52019-08-30 17:49:42 +0200877When a flow represents partial results of the operation or operation status updates, it may not be necessary
878to process each value, but instead, only most recent ones. In this case, the [conflate] operator can be used to skip
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300879intermediate values when a collector is too slow to process them. Building on the previous example:
880
881<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
882
883```kotlin
884import kotlinx.coroutines.*
885import kotlinx.coroutines.flow.*
886import kotlin.system.*
887
888fun foo(): Flow<Int> = flow {
889 for (i in 1..3) {
890 delay(100) // pretend we are asynchronously waiting 100 ms
891 emit(i) // emit next value
892 }
893}
894
895fun main() = runBlocking<Unit> {
896//sampleStart
897 val time = measureTimeMillis {
898 foo()
899 .conflate() // conflate emissions, don't process each one
900 .collect { value ->
901 delay(300) // pretend we are processing it for 300 ms
902 println(value)
903 }
904 }
905 println("Collected in $time ms")
906//sampleEnd
907}
908```
909
910</div>
911
David.Watsonbb714c52019-08-30 17:49:42 +0200912> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-18.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300913
David.Watsonbb714c52019-08-30 17:49:42 +0200914We see that while the first number was still being processed the second, and third were already produced, so
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300915the second one was _conflated_ and only the most recent (the third one) was delivered to the collector:
916
917```text
9181
9193
920Collected in 758 ms
921```
922
923<!--- TEST ARBITRARY_TIME -->
924
925#### Processing the latest value
926
David.Watsonbb714c52019-08-30 17:49:42 +0200927Conflation is one way to speed up processing when both the emitter and collector are slow. It does it by dropping emitted values.
928The other way is to cancel a slow collector and restart it every time a new value is emitted. There is
929a family of `xxxLatest` operators that perform the same essential logic of a `xxx` operator, but cancel the
930code in their block on a new value. Let's try changing [conflate] to [collectLatest] in the previous example:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300931
932<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
933
934```kotlin
935import kotlinx.coroutines.*
936import kotlinx.coroutines.flow.*
937import kotlin.system.*
938
939fun foo(): Flow<Int> = flow {
940 for (i in 1..3) {
941 delay(100) // pretend we are asynchronously waiting 100 ms
942 emit(i) // emit next value
943 }
944}
945
946fun main() = runBlocking<Unit> {
947//sampleStart
948 val time = measureTimeMillis {
949 foo()
950 .collectLatest { value -> // cancel & restart on the latest value
951 println("Collecting $value")
952 delay(300) // pretend we are processing it for 300 ms
953 println("Done $value")
954 }
955 }
956 println("Collected in $time ms")
957//sampleEnd
958}
959```
960
961</div>
962
David.Watsonbb714c52019-08-30 17:49:42 +0200963> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-19.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300964
965Since the body of [collectLatest] takes 300 ms, but new values are emitted every 100 ms, we see that the block
966is run on every value, but completes only for the last value:
967
968```text
969Collecting 1
970Collecting 2
971Collecting 3
972Done 3
973Collected in 741 ms
974```
975
976<!--- TEST ARBITRARY_TIME -->
977
978### Composing multiple flows
979
David.Watsonbb714c52019-08-30 17:49:42 +0200980There are lots of ways to compose multiple flows.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300981
982#### Zip
983
David.Watsonbb714c52019-08-30 17:49:42 +0200984Just like the [Sequence.zip] extension function in the Kotlin standard library,
985flows have a [zip] operator that combines the corresponding values of two flows:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300986
987<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
988
989```kotlin
990import kotlinx.coroutines.*
991import kotlinx.coroutines.flow.*
992
993fun main() = runBlocking<Unit> {
994//sampleStart
995 val nums = (1..3).asFlow() // numbers 1..3
996 val strs = flowOf("one", "two", "three") // strings
997 nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
998 .collect { println(it) } // collect and print
999//sampleEnd
1000}
1001```
1002
1003</div>
1004
David.Watsonbb714c52019-08-30 17:49:42 +02001005> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-20.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001006
1007This example prints:
1008
1009```text
10101 -> one
10112 -> two
10123 -> three
1013```
1014
1015<!--- TEST -->
1016
1017#### Combine
1018
David.Watsonbb714c52019-08-30 17:49:42 +02001019When flow represents the most recent value of a variable or operation (see also the related
1020section on [conflation](#conflation)), it might be needed to perform a computation that depends on
1021the most recent values of the corresponding flows and to recompute it whenever any of the upstream
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001022flows emit a value. The corresponding family of operators is called [combine].
1023
1024For example, if the numbers in the previous example update every 300ms, but strings update every 400 ms,
David.Watsonbb714c52019-08-30 17:49:42 +02001025then zipping them using the [zip] operator will still produce the same result,
1026albeit results that are printed every 400 ms:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001027
David.Watsonbb714c52019-08-30 17:49:42 +02001028> We use a [onEach] intermediate operator in this example to delay each element and make the code
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001029that emits sample flows more declarative and shorter.
1030
1031<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1032
1033```kotlin
1034import kotlinx.coroutines.*
1035import kotlinx.coroutines.flow.*
1036
1037fun main() = runBlocking<Unit> {
1038//sampleStart
1039 val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
1040 val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
1041 val startTime = System.currentTimeMillis() // remember the start time
1042 nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
1043 .collect { value -> // collect and print
1044 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1045 }
1046//sampleEnd
1047}
1048```
1049
1050</div>
1051
David.Watsonbb714c52019-08-30 17:49:42 +02001052> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-21.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001053
1054<!--- TEST ARBITRARY_TIME
10551 -> one at 437 ms from start
10562 -> two at 837 ms from start
10573 -> three at 1243 ms from start
1058-->
1059
David.Watsonbb714c52019-08-30 17:49:42 +02001060However, when using a [combine] operator here instead of a [zip]:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001061
1062<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1063
1064```kotlin
1065import kotlinx.coroutines.*
1066import kotlinx.coroutines.flow.*
1067
1068fun main() = runBlocking<Unit> {
1069//sampleStart
1070 val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
1071 val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
Roman Elizarova73862f2019-09-02 17:31:14 +03001072 val startTime = System.currentTimeMillis() // remember the start time
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001073 nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
1074 .collect { value -> // collect and print
1075 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1076 }
1077//sampleEnd
1078}
1079```
1080
1081</div>
1082
David.Watsonbb714c52019-08-30 17:49:42 +02001083> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-22.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001084
1085We get quite a different output, where a line is printed at each emission from either `nums` or `strs` flows:
1086
1087```text
10881 -> one at 452 ms from start
10892 -> one at 651 ms from start
10902 -> two at 854 ms from start
10913 -> two at 952 ms from start
10923 -> three at 1256 ms from start
1093```
1094
1095<!--- TEST ARBITRARY_TIME -->
1096
1097### Flattening flows
1098
1099Flows represent asynchronously received sequences of values, so it is quite easy to get in a situation where
1100each value triggers a request for another sequence of values. For example, we can have the following
1101function that returns a flow of two strings 500 ms apart:
1102
1103<div class="sample" markdown="1" theme="idea" data-highlight-only>
1104
1105```kotlin
1106fun requestFlow(i: Int): Flow<String> = flow {
1107 emit("$i: First")
1108 delay(500) // wait 500 ms
1109 emit("$i: Second")
1110}
1111```
1112
1113</div>
1114
1115<!--- CLEAR -->
1116
1117Now if we have a flow of three integers and call `requestFlow` for each of them like this:
1118
1119<div class="sample" markdown="1" theme="idea" data-highlight-only>
1120
1121```kotlin
1122(1..3).asFlow().map { requestFlow(it) }
1123```
1124
1125</div>
1126
1127<!--- CLEAR -->
1128
1129Then we end up with a flow of flows (`Flow<Flow<String>>`) that needs to be _flattened_ into a single flow for
1130further processing. Collections and sequences have [flatten][Sequence.flatten] and [flatMap][Sequence.flatMap]
David.Watsonbb714c52019-08-30 17:49:42 +02001131operators for this. However, due the asynchronous nature of flows they call for different _modes_ of flattening,
1132as such, there is a family of flattening operators on flows.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001133
1134#### flatMapConcat
1135
1136Concatenating mode is implemented by [flatMapConcat] and [flattenConcat] operators. They are the most direct
David.Watsonbb714c52019-08-30 17:49:42 +02001137analogues of the corresponding sequence operators. They wait for the inner flow to complete before
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001138starting to collect the next one as the following example shows:
1139
1140<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1141
1142```kotlin
1143import kotlinx.coroutines.*
1144import kotlinx.coroutines.flow.*
1145
1146fun requestFlow(i: Int): Flow<String> = flow {
1147 emit("$i: First")
1148 delay(500) // wait 500 ms
1149 emit("$i: Second")
1150}
1151
1152fun main() = runBlocking<Unit> {
1153//sampleStart
Roman Elizarova73862f2019-09-02 17:31:14 +03001154 val startTime = System.currentTimeMillis() // remember the start time
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001155 (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
1156 .flatMapConcat { requestFlow(it) }
1157 .collect { value -> // collect and print
1158 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1159 }
1160//sampleEnd
1161}
1162```
1163
1164</div>
1165
David.Watsonbb714c52019-08-30 17:49:42 +02001166> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-23.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001167
1168The sequential nature of [flatMapConcat] is clearly seen in the output:
1169
1170```text
11711: First at 121 ms from start
11721: Second at 622 ms from start
11732: First at 727 ms from start
11742: Second at 1227 ms from start
11753: First at 1328 ms from start
11763: Second at 1829 ms from start
1177```
1178
1179<!--- TEST ARBITRARY_TIME -->
1180
1181#### flatMapMerge
1182
1183Another flattening mode is to concurrently collect all the incoming flows and merge their values into
1184a single flow so that values are emitted as soon as possible.
1185It is implemented by [flatMapMerge] and [flattenMerge] operators. They both accept an optional
1186`concurrency` parameter that limits the number of concurrent flows that are collected at the same time
1187(it is equal to [DEFAULT_CONCURRENCY] by default).
1188
1189<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1190
1191```kotlin
1192import kotlinx.coroutines.*
1193import kotlinx.coroutines.flow.*
1194
1195fun requestFlow(i: Int): Flow<String> = flow {
1196 emit("$i: First")
1197 delay(500) // wait 500 ms
1198 emit("$i: Second")
1199}
1200
1201fun main() = runBlocking<Unit> {
1202//sampleStart
Roman Elizarova73862f2019-09-02 17:31:14 +03001203 val startTime = System.currentTimeMillis() // remember the start time
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001204 (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
1205 .flatMapMerge { requestFlow(it) }
1206 .collect { value -> // collect and print
1207 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1208 }
1209//sampleEnd
1210}
1211```
1212
1213</div>
1214
David.Watsonbb714c52019-08-30 17:49:42 +02001215> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-24.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001216
1217The concurrent nature of [flatMapMerge] is obvious:
1218
1219```text
12201: First at 136 ms from start
12212: First at 231 ms from start
12223: First at 333 ms from start
12231: Second at 639 ms from start
12242: Second at 732 ms from start
12253: Second at 833 ms from start
1226```
1227
1228<!--- TEST ARBITRARY_TIME -->
1229
David.Watsonbb714c52019-08-30 17:49:42 +02001230> Note that the [flatMapMerge] calls its block of code (`{ requestFlow(it) }` in this example) sequentially, but
1231collects the resulting flows concurrently, it is the equivalent of performing a sequential
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001232`map { requestFlow(it) }` first and then calling [flattenMerge] on the result.
1233
1234#### flatMapLatest
1235
David.Watsonbb714c52019-08-30 17:49:42 +02001236In a similar way to the [collectLatest] operator, that was shown in
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001237["Processing the latest value"](#processing-the-latest-value) section, there is the corresponding "Latest"
David.Watsonbb714c52019-08-30 17:49:42 +02001238flattening mode where a collection of the previous flow is cancelled as soon as new flow is emitted.
1239It is implemented by the [flatMapLatest] operator.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001240
1241<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1242
1243```kotlin
1244import kotlinx.coroutines.*
1245import kotlinx.coroutines.flow.*
1246
1247fun requestFlow(i: Int): Flow<String> = flow {
1248 emit("$i: First")
1249 delay(500) // wait 500 ms
1250 emit("$i: Second")
1251}
1252
1253fun main() = runBlocking<Unit> {
1254//sampleStart
Roman Elizarova73862f2019-09-02 17:31:14 +03001255 val startTime = System.currentTimeMillis() // remember the start time
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001256 (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
1257 .flatMapLatest { requestFlow(it) }
1258 .collect { value -> // collect and print
1259 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1260 }
1261//sampleEnd
1262}
1263```
1264
1265</div>
1266
David.Watsonbb714c52019-08-30 17:49:42 +02001267> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-25.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001268
David.Watsonbb714c52019-08-30 17:49:42 +02001269The output here in this example is a good demonstration of how [flatMapLatest] works:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001270
1271```text
12721: First at 142 ms from start
12732: First at 322 ms from start
12743: First at 425 ms from start
12753: Second at 931 ms from start
1276```
1277
1278<!--- TEST ARBITRARY_TIME -->
1279
1280> Note that [flatMapLatest] cancels all the code in its block (`{ requestFlow(it) }` in this example) on a new value.
1281It makes no difference in this particular example, because the call to `requestFlow` itself is fast, not-suspending,
1282and cannot be cancelled. However, it would show up if we were to use suspending functions like `delay` in there.
1283
1284### Flow exceptions
1285
David.Watsonbb714c52019-08-30 17:49:42 +02001286Flow collection can complete with an exception when an emitter or code inside the operators throw an exception.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001287There are several ways to handle these exceptions.
1288
1289#### Collector try and catch
1290
1291A collector can use Kotlin's [`try/catch`][exceptions] block to handle exceptions:
1292
1293<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1294
1295```kotlin
1296import kotlinx.coroutines.*
1297import kotlinx.coroutines.flow.*
1298
1299//sampleStart
1300fun foo(): Flow<Int> = flow {
1301 for (i in 1..3) {
1302 println("Emitting $i")
1303 emit(i) // emit next value
1304 }
1305}
1306
1307fun main() = runBlocking<Unit> {
1308 try {
1309 foo().collect { value ->
1310 println(value)
1311 check(value <= 1) { "Collected $value" }
1312 }
1313 } catch (e: Throwable) {
1314 println("Caught $e")
1315 }
1316}
1317//sampleEnd
1318```
1319
1320</div>
1321
David.Watsonbb714c52019-08-30 17:49:42 +02001322> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-26.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001323
1324This code successfully catches an exception in [collect] terminal operator and,
David.Watsonbb714c52019-08-30 17:49:42 +02001325as we see, no more values are emitted after that:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001326
1327```text
1328Emitting 1
13291
1330Emitting 2
13312
1332Caught java.lang.IllegalStateException: Collected 2
1333```
1334
1335<!--- TEST -->
1336
1337#### Everything is caught
1338
David.Watsonbb714c52019-08-30 17:49:42 +02001339The previous example actually catches any exception happening in the emitter or in any intermediate or terminal operators.
1340For example, let's change the code so that emitted values are [mapped][map] to strings,
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001341but the corresponding code produces an exception:
1342
1343<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1344
1345```kotlin
1346import kotlinx.coroutines.*
1347import kotlinx.coroutines.flow.*
1348
1349//sampleStart
1350fun foo(): Flow<String> =
1351 flow {
1352 for (i in 1..3) {
1353 println("Emitting $i")
1354 emit(i) // emit next value
1355 }
1356 }
1357 .map { value ->
1358 check(value <= 1) { "Crashed on $value" }
1359 "string $value"
1360 }
1361
1362fun main() = runBlocking<Unit> {
1363 try {
1364 foo().collect { value -> println(value) }
1365 } catch (e: Throwable) {
1366 println("Caught $e")
1367 }
1368}
1369//sampleEnd
1370```
1371
1372</div>
1373
David.Watsonbb714c52019-08-30 17:49:42 +02001374> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-27.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001375
1376This exception is still caught and collection is stopped:
1377
1378```text
1379Emitting 1
1380string 1
1381Emitting 2
1382Caught java.lang.IllegalStateException: Crashed on 2
1383```
1384
1385<!--- TEST -->
1386
1387### Exception transparency
1388
David.Watsonbb714c52019-08-30 17:49:42 +02001389But how can code of the emitter encapsulate its exception handling behavior?
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001390
David.Watsonbb714c52019-08-30 17:49:42 +02001391Flows must be _transparent to exceptions_ and it is a violation of the exception transparency to [emit][FlowCollector.emit] values in the
1392`flow { ... }` builder from inside of a `try/catch` block. This guarantees that a collector throwing an exception
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001393can always catch it using `try/catch` as in the previous example.
1394
David.Watsonbb714c52019-08-30 17:49:42 +02001395The emitter can use a [catch] operator that preserves this exception transparency and allows encapsulation
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001396of its exception handling. The body of the `catch` operator can analyze an exception
1397and react to it in different ways depending on which exception was caught:
1398
1399* Exceptions can be rethrown using `throw`.
1400* Exceptions can be turned into emission of values using [emit][FlowCollector.emit] from the body of [catch].
1401* Exceptions can be ignored, logged, or processed by some other code.
1402
David.Watsonbb714c52019-08-30 17:49:42 +02001403For example, let us emit the text on catching an exception:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001404
1405<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1406
1407```kotlin
1408import kotlinx.coroutines.*
1409import kotlinx.coroutines.flow.*
1410
1411fun foo(): Flow<String> =
1412 flow {
1413 for (i in 1..3) {
1414 println("Emitting $i")
1415 emit(i) // emit next value
1416 }
1417 }
1418 .map { value ->
1419 check(value <= 1) { "Crashed on $value" }
1420 "string $value"
1421 }
1422
1423fun main() = runBlocking<Unit> {
1424//sampleStart
1425 foo()
1426 .catch { e -> emit("Caught $e") } // emit on exception
1427 .collect { value -> println(value) }
1428//sampleEnd
1429}
1430```
1431
1432</div>
1433
David.Watsonbb714c52019-08-30 17:49:42 +02001434> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-28.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001435
1436The output of the example is the same, even though we do not have `try/catch` around the code anymore.
1437
1438<!--- TEST
1439Emitting 1
1440string 1
1441Emitting 2
1442Caught java.lang.IllegalStateException: Crashed on 2
1443-->
1444
1445#### Transparent catch
1446
1447The [catch] intermediate operator, honoring exception transparency, catches only upstream exceptions
1448(that is an exception from all the operators above `catch`, but not below it).
1449If the block in `collect { ... }` (placed below `catch`) throws an exception then it escapes:
1450
1451<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1452
1453```kotlin
1454import kotlinx.coroutines.*
1455import kotlinx.coroutines.flow.*
1456
1457//sampleStart
1458fun foo(): Flow<Int> = flow {
1459 for (i in 1..3) {
1460 println("Emitting $i")
1461 emit(i)
1462 }
1463}
1464
1465fun main() = runBlocking<Unit> {
1466 foo()
1467 .catch { e -> println("Caught $e") } // does not catch downstream exceptions
1468 .collect { value ->
1469 check(value <= 1) { "Collected $value" }
1470 println(value)
1471 }
1472}
1473//sampleEnd
1474```
1475
1476</div>
1477
David.Watsonbb714c52019-08-30 17:49:42 +02001478> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-29.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001479
David.Watsonbb714c52019-08-30 17:49:42 +02001480A "Caught ..." message is not printed despite there being a `catch` operator:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001481
1482<!--- TEST EXCEPTION
1483Emitting 1
14841
1485Emitting 2
1486Exception in thread "main" java.lang.IllegalStateException: Collected 2
1487 at ...
1488-->
1489
1490#### Catching declaratively
1491
David.Watsonbb714c52019-08-30 17:49:42 +02001492We can combine the declarative nature of the [catch] operator with a desire to handle all the exceptions, by moving the body
1493of the [collect] operator into [onEach] and putting it before the `catch` operator. Collection of this flow must
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001494be triggered by a call to `collect()` without parameters:
1495
1496<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1497
1498```kotlin
1499import kotlinx.coroutines.*
1500import kotlinx.coroutines.flow.*
1501
1502fun foo(): Flow<Int> = flow {
1503 for (i in 1..3) {
1504 println("Emitting $i")
1505 emit(i)
1506 }
1507}
1508
1509fun main() = runBlocking<Unit> {
1510//sampleStart
1511 foo()
1512 .onEach { value ->
1513 check(value <= 1) { "Collected $value" }
1514 println(value)
1515 }
1516 .catch { e -> println("Caught $e") }
1517 .collect()
1518//sampleEnd
1519}
1520```
1521
1522</div>
1523
David.Watsonbb714c52019-08-30 17:49:42 +02001524> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-30.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001525
David.Watsonbb714c52019-08-30 17:49:42 +02001526Now we can see that a "Caught ..." message is printed and so we can catch all the exceptions without explicitly
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001527using a `try/catch` block:
1528
1529<!--- TEST EXCEPTION
1530Emitting 1
15311
1532Emitting 2
1533Caught java.lang.IllegalStateException: Collected 2
1534-->
1535
1536### Flow completion
1537
David.Watsonbb714c52019-08-30 17:49:42 +02001538When flow collection completes (normally or exceptionally) it may need to execute an action.
1539As you may have already noticed, it can be done in two ways: imperative or declarative.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001540
1541#### Imperative finally block
1542
David.Watsonbb714c52019-08-30 17:49:42 +02001543In addition to `try`/`catch`, a collector can also use a `finally` block to execute an action
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001544upon `collect` completion.
1545
1546<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1547
1548```kotlin
1549import kotlinx.coroutines.*
1550import kotlinx.coroutines.flow.*
1551
1552//sampleStart
1553fun foo(): Flow<Int> = (1..3).asFlow()
1554
1555fun main() = runBlocking<Unit> {
1556 try {
1557 foo().collect { value -> println(value) }
1558 } finally {
1559 println("Done")
1560 }
1561}
1562//sampleEnd
1563```
1564
1565</div>
1566
David.Watsonbb714c52019-08-30 17:49:42 +02001567> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-31.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001568
David.Watsonbb714c52019-08-30 17:49:42 +02001569This code prints three numbers produced by the `foo()` flow followed by a "Done" string:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001570
1571```text
15721
15732
15743
1575Done
1576```
1577
1578<!--- TEST -->
1579
1580#### Declarative handling
1581
David.Watsonbb714c52019-08-30 17:49:42 +02001582For the declarative approach, flow has [onCompletion] intermediate operator that is invoked
1583when the flow has completely collected.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001584
David.Watsonbb714c52019-08-30 17:49:42 +02001585The previous example can be rewritten using an [onCompletion] operator and produces the same output:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001586
1587<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1588
1589```kotlin
1590import kotlinx.coroutines.*
1591import kotlinx.coroutines.flow.*
1592
1593fun foo(): Flow<Int> = (1..3).asFlow()
1594
1595fun main() = runBlocking<Unit> {
1596//sampleStart
1597 foo()
1598 .onCompletion { println("Done") }
1599 .collect { value -> println(value) }
1600//sampleEnd
1601}
1602```
1603</div>
1604
David.Watsonbb714c52019-08-30 17:49:42 +02001605> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-32.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001606
1607<!--- TEST
16081
16092
16103
1611Done
1612-->
1613
1614The key advantage of [onCompletion] is a nullable `Throwable` parameter of the lambda that can be used
David.Watsonbb714c52019-08-30 17:49:42 +02001615to determine whether the flow collection was completed normally or exceptionally. In the following
1616example the `foo()` flow throws an exception after emitting the number 1:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001617
1618<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1619
1620```kotlin
1621import kotlinx.coroutines.*
1622import kotlinx.coroutines.flow.*
1623
1624//sampleStart
1625fun foo(): Flow<Int> = flow {
1626 emit(1)
1627 throw RuntimeException()
1628}
1629
1630fun main() = runBlocking<Unit> {
1631 foo()
1632 .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
1633 .catch { cause -> println("Caught exception") }
1634 .collect { value -> println(value) }
1635}
1636//sampleEnd
1637```
1638</div>
1639
David.Watsonbb714c52019-08-30 17:49:42 +02001640> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-33.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001641
1642As you may expect, it prints:
1643
1644```text
16451
1646Flow completed exceptionally
1647Caught exception
1648```
1649
1650<!--- TEST -->
1651
David.Watsonbb714c52019-08-30 17:49:42 +02001652The [onCompletion] operator, unlike [catch], does not handle the exception. As we can see from the above
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001653example code, the exception still flows downstream. It will be delivered to further `onCompletion` operators
David.Watsonbb714c52019-08-30 17:49:42 +02001654and can be handled with a `catch` operator.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001655
1656#### Upstream exceptions only
1657
David.Watsonbb714c52019-08-30 17:49:42 +02001658Just like the [catch] operator, [onCompletion] only sees exceptions coming from upstream and does not
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001659see downstream exceptions. For example, run the following code:
1660
1661<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1662
1663```kotlin
1664import kotlinx.coroutines.*
1665import kotlinx.coroutines.flow.*
1666
1667//sampleStart
1668fun foo(): Flow<Int> = (1..3).asFlow()
1669
1670fun main() = runBlocking<Unit> {
1671 foo()
1672 .onCompletion { cause -> println("Flow completed with $cause") }
1673 .collect { value ->
1674 check(value <= 1) { "Collected $value" }
1675 println(value)
1676 }
1677}
1678//sampleEnd
1679```
1680
1681</div>
1682
David.Watsonbb714c52019-08-30 17:49:42 +02001683> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-34.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001684
David.Watsonbb714c52019-08-30 17:49:42 +02001685We can see the completion cause is null, yet collection failed with exception:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001686
1687```text
16881
1689Flow completed with null
1690Exception in thread "main" java.lang.IllegalStateException: Collected 2
1691```
1692
1693<!--- TEST EXCEPTION -->
1694
1695### Imperative versus declarative
1696
David.Watsonbb714c52019-08-30 17:49:42 +02001697Now we know how to collect flow, and handle its completion and exceptions in both imperative and declarative ways.
1698The natural question here is, which approach is preferred and why?
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001699As a library, we do not advocate for any particular approach and believe that both options
1700are valid and should be selected according to your own preferences and code style.
1701
1702### Launching flow
1703
David.Watsonbb714c52019-08-30 17:49:42 +02001704It is easy to use flows to represent asynchronous events that are coming from some source.
1705In this case, we need an analogue of the `addEventListener` function that registers a piece of code with a reaction
1706for incoming events and continues further work. The [onEach] operator can serve this role.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001707However, `onEach` is an intermediate operator. We also need a terminal operator to collect the flow.
1708Otherwise, just calling `onEach` has no effect.
1709
David.Watsonbb714c52019-08-30 17:49:42 +02001710If we use the [collect] terminal operator after `onEach`, then the code after it will wait until the flow is collected:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001711
1712<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1713
1714```kotlin
1715import kotlinx.coroutines.*
1716import kotlinx.coroutines.flow.*
1717
1718//sampleStart
1719// Imitate a flow of events
1720fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
1721
1722fun main() = runBlocking<Unit> {
1723 events()
1724 .onEach { event -> println("Event: $event") }
1725 .collect() // <--- Collecting the flow waits
1726 println("Done")
1727}
1728//sampleEnd
1729```
1730
1731</div>
1732
David.Watsonbb714c52019-08-30 17:49:42 +02001733> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-35.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001734
1735As you can see, it prints:
1736
1737```text
1738Event: 1
1739Event: 2
1740Event: 3
1741Done
1742```
1743
1744<!--- TEST -->
1745
David.Watsonbb714c52019-08-30 17:49:42 +02001746The [launchIn] terminal operator comes in handy here. By replacing `collect` with `launchIn` we can
1747launch a collection of the flow in a separate coroutine, so that execution of further code
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001748immediately continues:
1749
1750<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1751
1752```kotlin
1753import kotlinx.coroutines.*
1754import kotlinx.coroutines.flow.*
1755
1756// Imitate a flow of events
1757fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
1758
1759//sampleStart
1760fun main() = runBlocking<Unit> {
1761 events()
1762 .onEach { event -> println("Event: $event") }
1763 .launchIn(this) // <--- Launching the flow in a separate coroutine
1764 println("Done")
1765}
1766//sampleEnd
1767```
1768
1769</div>
1770
David.Watsonbb714c52019-08-30 17:49:42 +02001771> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-36.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001772
1773It prints:
1774
1775```text
1776Done
1777Event: 1
1778Event: 2
1779Event: 3
1780```
1781
1782<!--- TEST -->
1783
1784The required parameter to `launchIn` must specify a [CoroutineScope] in which the coroutine to collect the flow is
David.Watsonbb714c52019-08-30 17:49:42 +02001785launched. In the above example this scope comes from the [runBlocking]
1786coroutine builder, so while the flow is running, this [runBlocking] scope waits for completion of its child coroutine
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001787and keeps the main function from returning and terminating this example.
1788
David.Watsonbb714c52019-08-30 17:49:42 +02001789In actual applications a scope will come from an entity with a limited
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001790lifetime. As soon as the lifetime of this entity is terminated the corresponding scope is cancelled, cancelling
David.Watsonbb714c52019-08-30 17:49:42 +02001791the collection of the corresponding flow. This way the pair of `onEach { ... }.launchIn(scope)` works
1792like the `addEventListener`. However, there is no need for the corresponding `removeEventListener` function,
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001793as cancellation and structured concurrency serve this purpose.
1794
David.Watsonbb714c52019-08-30 17:49:42 +02001795Note that [launchIn] also returns a [Job], which can be used to [cancel][Job.cancel] the corresponding flow collection
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001796coroutine only without cancelling the whole scope or to [join][Job.join] it.
1797
Vsevolod Tolstopyatovc99704a2019-09-24 19:30:49 +03001798### Flow and Reactive Streams
1799
1800For those who are familiar with [Reactive Streams](https://www.reactive-streams.org/) or reactive frameworks such as RxJava and project Reactor,
1801design of the Flow may look very familiar.
1802
1803Indeed, its design was inspired by Reactive Streams and its various implementations. But Flow main goal is to have as simple design as possible,
1804be Kotlin and suspension friendly and respect structured concurrency. Achieving this goal would be impossible without reactive pioneers and their tremendous work. You can read the complete story in [Reactive Streams and Kotlin Flows](https://medium.com/@elizarov/reactive-streams-and-kotlin-flows-bfd12772cda4) article.
1805
1806While being different, conceptually, Flow *is* a reactive stream and it is possible to convert it to the reactive (spec and TCK compliant) Publisher and vice versa.
1807Such converters are provided by `kotlinx.coroutines` out-of-the-box and can be found in corresponding reactive modules (`kotlinx-coroutines-reactive` for Reactive Streams, `kotlinx-coroutines-reactor` for Project Reactor and `kotlinx-coroutines-rx2` for RxJava2).
1808Integration modules include conversions from and to `Flow`, integration with Reactor's `Context` and suspension-friendly ways to work with various reactive entities.
1809
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001810<!-- stdlib references -->
1811
1812[collections]: https://kotlinlang.org/docs/reference/collections-overview.html
1813[List]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/-list/index.html
1814[forEach]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/for-each.html
1815[Sequence]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/index.html
1816[Sequence.zip]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/zip.html
1817[Sequence.flatten]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/flatten.html
1818[Sequence.flatMap]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/flat-map.html
1819[exceptions]: https://kotlinlang.org/docs/reference/exceptions.html
1820
1821<!--- MODULE kotlinx-coroutines-core -->
1822<!--- INDEX kotlinx.coroutines -->
1823[delay]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/delay.html
1824[withTimeoutOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-timeout-or-null.html
1825[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
1826[Dispatchers.Main]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-main.html
1827[withContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-context.html
1828[CoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-dispatcher/index.html
1829[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html
1830[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
1831[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/index.html
1832[Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/cancel.html
1833[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html
1834<!--- INDEX kotlinx.coroutines.flow -->
1835[Flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html
1836[flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow.html
1837[FlowCollector.emit]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow-collector/emit.html
1838[collect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect.html
1839[flowOf]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-of.html
1840[map]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/map.html
1841[filter]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/filter.html
1842[transform]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/transform.html
1843[take]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/take.html
1844[toList]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-list.html
1845[toSet]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-set.html
1846[first]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/first.html
1847[single]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/single.html
1848[reduce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/reduce.html
1849[fold]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/fold.html
1850[flowOn]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-on.html
1851[buffer]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/buffer.html
1852[conflate]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/conflate.html
1853[collectLatest]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect-latest.html
1854[zip]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/zip.html
1855[combine]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/combine.html
1856[onEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-each.html
1857[flatMapConcat]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-concat.html
1858[flattenConcat]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-concat.html
1859[flatMapMerge]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-merge.html
1860[flattenMerge]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-merge.html
1861[DEFAULT_CONCURRENCY]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-d-e-f-a-u-l-t_-c-o-n-c-u-r-r-e-n-c-y.html
1862[flatMapLatest]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-latest.html
1863[catch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/catch.html
1864[onCompletion]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-completion.html
1865[launchIn]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/launch-in.html
1866<!--- END -->