blob: ce4e80f1bbdfa8f65bbcb61cc6d4b06671d457d0 [file] [log] [blame] [view]
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001<!--- INCLUDE .*/example-([a-z]+)-([0-9a-z]+)\.kt
2/*
3 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
4 */
5
6// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
7package kotlinx.coroutines.guide.$$1$$2
8-->
9<!--- KNIT ../kotlinx-coroutines-core/jvm/test/guide/.*-##\.kt -->
10<!--- TEST_OUT ../kotlinx-coroutines-core/jvm/test/guide/test/FlowGuideTest.kt
11// This file was automatically generated from flow.md by Knit tool. Do not edit.
12package kotlinx.coroutines.guide.test
13
14import org.junit.Test
15
16class FlowGuideTest {
17-->
18
19**Table of contents**
20
21<!--- TOC -->
22
23* [Asynchronous Flow](#asynchronous-flow)
24 * [Representing multiple values](#representing-multiple-values)
25 * [Sequences](#sequences)
26 * [Suspending functions](#suspending-functions)
27 * [Flows](#flows)
28 * [Flows are cold](#flows-are-cold)
29 * [Flow cancellation](#flow-cancellation)
30 * [Flow builders](#flow-builders)
31 * [Intermediate flow operators](#intermediate-flow-operators)
32 * [Transform operator](#transform-operator)
33 * [Size-limiting operators](#size-limiting-operators)
34 * [Terminal flow operators](#terminal-flow-operators)
35 * [Flows are sequential](#flows-are-sequential)
36 * [Flow context](#flow-context)
37 * [Wrong emission withContext](#wrong-emission-withcontext)
38 * [flowOn operator](#flowon-operator)
39 * [Buffering](#buffering)
40 * [Conflation](#conflation)
41 * [Processing the latest value](#processing-the-latest-value)
42 * [Composing multiple flows](#composing-multiple-flows)
43 * [Zip](#zip)
44 * [Combine](#combine)
45 * [Flattening flows](#flattening-flows)
46 * [flatMapConcat](#flatmapconcat)
47 * [flatMapMerge](#flatmapmerge)
48 * [flatMapLatest](#flatmaplatest)
49 * [Flow exceptions](#flow-exceptions)
50 * [Collector try and catch](#collector-try-and-catch)
51 * [Everything is caught](#everything-is-caught)
52 * [Exception transparency](#exception-transparency)
53 * [Transparent catch](#transparent-catch)
54 * [Catching declaratively](#catching-declaratively)
55 * [Flow completion](#flow-completion)
56 * [Imperative finally block](#imperative-finally-block)
57 * [Declarative handling](#declarative-handling)
58 * [Upstream exceptions only](#upstream-exceptions-only)
59 * [Imperative versus declarative](#imperative-versus-declarative)
60 * [Launching flow](#launching-flow)
Vsevolod Tolstopyatovc99704a2019-09-24 19:30:49 +030061 * [Flow and Reactive Streams](#flow-and-reactive-streams)
Roman Elizarov3258e1f2019-08-22 20:08:48 +030062
63<!--- END_TOC -->
64
65## Asynchronous Flow
66
David.Watsonbb714c52019-08-30 17:49:42 +020067Suspending functions asynchronously returns a single value, but how can we return
68multiple asynchronously computed values? This is where Kotlin Flows come in.
Roman Elizarov3258e1f2019-08-22 20:08:48 +030069
70### Representing multiple values
71
72Multiple values can be represented in Kotlin using [collections].
73For example, we can have a function `foo()` that returns a [List]
David.Watsonbb714c52019-08-30 17:49:42 +020074of three numbers and then print them all using [forEach]:
Roman Elizarov3258e1f2019-08-22 20:08:48 +030075
76<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
77
78```kotlin
79fun foo(): List<Int> = listOf(1, 2, 3)
80
81fun main() {
82 foo().forEach { value -> println(value) }
83}
84```
85
86</div>
87
David.Watsonbb714c52019-08-30 17:49:42 +020088> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-01.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +030089
90This code outputs:
91
92```text
931
942
953
96```
97
98<!--- TEST -->
99
100#### Sequences
101
David.Watsonbb714c52019-08-30 17:49:42 +0200102If we are computing the numbers with some CPU-consuming blocking code
103(each computation taking 100ms), then we can represent the numbers using a [Sequence]:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300104
105<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
106
107```kotlin
108fun foo(): Sequence<Int> = sequence { // sequence builder
109 for (i in 1..3) {
110 Thread.sleep(100) // pretend we are computing it
111 yield(i) // yield next value
112 }
113}
114
115fun main() {
116 foo().forEach { value -> println(value) }
117}
118```
119
120</div>
121
David.Watsonbb714c52019-08-30 17:49:42 +0200122> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-02.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300123
124This code outputs the same numbers, but it waits 100ms before printing each one.
125
126<!--- TEST
1271
1282
1293
130-->
131
132#### Suspending functions
133
134However, this computation blocks the main thread that is running the code.
David.Watsonbb714c52019-08-30 17:49:42 +0200135When these values are computed by asynchronous code we can mark the function `foo` with a `suspend` modifier,
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300136so that it can perform its work without blocking and return the result as a list:
137
138<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
139
140```kotlin
141import kotlinx.coroutines.*
142
143//sampleStart
144suspend fun foo(): List<Int> {
145 delay(1000) // pretend we are doing something asynchronous here
146 return listOf(1, 2, 3)
147}
148
149fun main() = runBlocking<Unit> {
150 foo().forEach { value -> println(value) }
151}
152//sampleEnd
153```
154
155</div>
156
David.Watsonbb714c52019-08-30 17:49:42 +0200157> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-03.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300158
159This code prints the numbers after waiting for a second.
160
161<!--- TEST
1621
1632
1643
165-->
166
167#### Flows
168
David.Watsonbb714c52019-08-30 17:49:42 +0200169Using the `List<Int>` result type, means we can only return all the values at once. To represent
170the stream of values that are being asynchronously computed, we can use a [`Flow<Int>`][Flow] type just like we would the `Sequence<Int>` type for synchronously computed values:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300171
172<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
173
174```kotlin
175import kotlinx.coroutines.*
176import kotlinx.coroutines.flow.*
177
178//sampleStart
179fun foo(): Flow<Int> = flow { // flow builder
180 for (i in 1..3) {
181 delay(100) // pretend we are doing something useful here
182 emit(i) // emit next value
183 }
184}
185
186fun main() = runBlocking<Unit> {
David.Watsonbb714c52019-08-30 17:49:42 +0200187 // Launch a concurrent coroutine to check if the main thread is blocked
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300188 launch {
189 for (k in 1..3) {
190 println("I'm not blocked $k")
191 delay(100)
192 }
193 }
194 // Collect the flow
195 foo().collect { value -> println(value) }
196}
197//sampleEnd
198```
199
200</div>
201
David.Watsonbb714c52019-08-30 17:49:42 +0200202> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-04.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300203
204This code waits 100ms before printing each number without blocking the main thread. This is verified
205by printing "I'm not blocked" every 100ms from a separate coroutine that is running in the main thread:
206
207```text
208I'm not blocked 1
2091
210I'm not blocked 2
2112
212I'm not blocked 3
2133
214```
215
216<!--- TEST -->
217
David.Watsonbb714c52019-08-30 17:49:42 +0200218Notice the following differences in the code with the [Flow] from the earlier examples:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300219
220* A builder function for [Flow] type is called [flow].
221* Code inside the `flow { ... }` builder block can suspend.
222* The function `foo()` is no longer marked with `suspend` modifier.
223* Values are _emitted_ from the flow using [emit][FlowCollector.emit] function.
224* Values are _collected_ from the flow using [collect][collect] function.
225
David.Watsonbb714c52019-08-30 17:49:42 +0200226> We can replace [delay] with `Thread.sleep` in the body of `foo`'s `flow { ... }` and see that the main
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300227thread is blocked in this case.
228
229### Flows are cold
230
David.Watsonbb714c52019-08-30 17:49:42 +0200231Flows are _cold_ streams similar to sequences &mdash; the code inside a [flow] builder does not
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300232run until the flow is collected. This becomes clear in the following example:
233
234<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
235
236```kotlin
237import kotlinx.coroutines.*
238import kotlinx.coroutines.flow.*
239
240//sampleStart
241fun foo(): Flow<Int> = flow {
242 println("Flow started")
243 for (i in 1..3) {
244 delay(100)
245 emit(i)
246 }
247}
248
249fun main() = runBlocking<Unit> {
250 println("Calling foo...")
251 val flow = foo()
252 println("Calling collect...")
253 flow.collect { value -> println(value) }
254 println("Calling collect again...")
255 flow.collect { value -> println(value) }
256}
257//sampleEnd
258```
259
260</div>
261
David.Watsonbb714c52019-08-30 17:49:42 +0200262> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-05.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300263
264Which prints:
265
266```text
267Calling foo...
268Calling collect...
269Flow started
2701
2712
2723
273Calling collect again...
274Flow started
2751
2762
2773
278```
279
280<!--- TEST -->
281
David.Watsonbb714c52019-08-30 17:49:42 +0200282This is a key reason the `foo()` function (which returns a flow) is not marked with `suspend` modifier.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300283By itself, `foo()` returns quickly and does not wait for anything. The flow starts every time it is collected,
David.Watsonbb714c52019-08-30 17:49:42 +0200284that is why we see "Flow started" when we call `collect` again.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300285
286### Flow cancellation
287
David.Watsonbb714c52019-08-30 17:49:42 +0200288Flow adheres to the general cooperative cancellation of coroutines. However, flow infrastructure does not introduce
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300289additional cancellation points. It is fully transparent for cancellation. As usual, flow collection can be
David.Watsonbb714c52019-08-30 17:49:42 +0200290cancelled when the flow is suspended in a cancellable suspending function (like [delay]), and cannot be cancelled otherwise.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300291
David.Watsonbb714c52019-08-30 17:49:42 +0200292The following example shows how the flow gets cancelled on a timeout when running in a [withTimeoutOrNull] block
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300293and stops executing its code:
294
295<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
296
297```kotlin
298import kotlinx.coroutines.*
299import kotlinx.coroutines.flow.*
300
301//sampleStart
302fun foo(): Flow<Int> = flow {
303 for (i in 1..3) {
304 delay(100)
305 println("Emitting $i")
306 emit(i)
307 }
308}
309
310fun main() = runBlocking<Unit> {
311 withTimeoutOrNull(250) { // Timeout after 250ms
312 foo().collect { value -> println(value) }
313 }
314 println("Done")
315}
316//sampleEnd
317```
318
319</div>
320
David.Watsonbb714c52019-08-30 17:49:42 +0200321> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-06.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300322
323Notice how only two numbers get emitted by the flow in `foo()` function, producing the following output:
324
325```text
326Emitting 1
3271
328Emitting 2
3292
330Done
331```
332
333<!--- TEST -->
334
335### Flow builders
336
337The `flow { ... }` builder from the previous examples is the most basic one. There are other builders for
David.Watsonbb714c52019-08-30 17:49:42 +0200338easier declaration of flows:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300339
340* [flowOf] builder that defines a flow emitting a fixed set of values.
341* Various collections and sequences can be converted to flows using `.asFlow()` extension functions.
342
David.Watsonbb714c52019-08-30 17:49:42 +0200343So, the example that prints the numbers from 1 to 3 from a flow can be written as:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300344
345<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
346
347```kotlin
348import kotlinx.coroutines.*
349import kotlinx.coroutines.flow.*
350
351fun main() = runBlocking<Unit> {
352//sampleStart
353 // Convert an integer range to a flow
354 (1..3).asFlow().collect { value -> println(value) }
355//sampleEnd
356}
357```
358
359</div>
360
David.Watsonbb714c52019-08-30 17:49:42 +0200361> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-07.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300362
363<!--- TEST
3641
3652
3663
367-->
368
369### Intermediate flow operators
370
David.Watsonbb714c52019-08-30 17:49:42 +0200371Flows can be transformed with operators, just as you would with collections and sequences.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300372Intermediate operators are applied to an upstream flow and return a downstream flow.
373These operators are cold, just like flows are. A call to such an operator is not
374a suspending function itself. It works quickly, returning the definition of a new transformed flow.
375
376The basic operators have familiar names like [map] and [filter].
David.Watsonbb714c52019-08-30 17:49:42 +0200377The important difference to sequences is that blocks of
378code inside these operators can call suspending functions.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300379
380For example, a flow of incoming requests can be
David.Watsonbb714c52019-08-30 17:49:42 +0200381mapped to the results with the [map] operator, even when performing a request is a long-running
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300382operation that is implemented by a suspending function:
383
384<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
385
386```kotlin
387import kotlinx.coroutines.*
388import kotlinx.coroutines.flow.*
389
390//sampleStart
391suspend fun performRequest(request: Int): String {
392 delay(1000) // imitate long-running asynchronous work
393 return "response $request"
394}
395
396fun main() = runBlocking<Unit> {
397 (1..3).asFlow() // a flow of requests
398 .map { request -> performRequest(request) }
399 .collect { response -> println(response) }
400}
401//sampleEnd
402```
403
404</div>
405
David.Watsonbb714c52019-08-30 17:49:42 +0200406> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-08.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300407
David.Watsonbb714c52019-08-30 17:49:42 +0200408It produces the following three lines, each line appearing after each second:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300409
410```text
411response 1
412response 2
413response 3
414```
415
416<!--- TEST -->
417
418#### Transform operator
419
420Among the flow transformation operators, the most general one is called [transform]. It can be used to imitate
David.Watsonbb714c52019-08-30 17:49:42 +0200421simple transformations like [map] and [filter], as well as implement more complex transformations.
422Using the `transform` operator, we can [emit][FlowCollector.emit] arbitrary values an arbitrary number of times.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300423
424For example, using `transform` we can emit a string before performing a long-running asynchronous request
425and follow it with a response:
426
427<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
428
429```kotlin
430import kotlinx.coroutines.*
431import kotlinx.coroutines.flow.*
432
433suspend fun performRequest(request: Int): String {
434 delay(1000) // imitate long-running asynchronous work
435 return "response $request"
436}
437
438fun main() = runBlocking<Unit> {
439//sampleStart
440 (1..3).asFlow() // a flow of requests
441 .transform { request ->
442 emit("Making request $request")
443 emit(performRequest(request))
444 }
445 .collect { response -> println(response) }
446//sampleEnd
447}
448```
449
450</div>
451
David.Watsonbb714c52019-08-30 17:49:42 +0200452> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-09.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300453
454The output of this code is:
455
456```text
457Making request 1
458response 1
459Making request 2
460response 2
461Making request 3
462response 3
463```
464
465<!--- TEST -->
466
467#### Size-limiting operators
468
469Size-limiting intermediate operators like [take] cancel the execution of the flow when the corresponding limit
David.Watsonbb714c52019-08-30 17:49:42 +0200470is reached. Cancellation in coroutines is always performed by throwing an exception, so that all the resource-management
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300471functions (like `try { ... } finally { ... }` blocks) operate normally in case of cancellation:
472
473<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
474
475```kotlin
476import kotlinx.coroutines.*
477import kotlinx.coroutines.flow.*
478
479//sampleStart
480fun numbers(): Flow<Int> = flow {
481 try {
482 emit(1)
483 emit(2)
484 println("This line will not execute")
485 emit(3)
486 } finally {
487 println("Finally in numbers")
488 }
489}
490
491fun main() = runBlocking<Unit> {
492 numbers()
493 .take(2) // take only the first two
494 .collect { value -> println(value) }
495}
496//sampleEnd
497```
498
499</div>
500
David.Watsonbb714c52019-08-30 17:49:42 +0200501> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-10.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300502
David.Watsonbb714c52019-08-30 17:49:42 +0200503The output of this code clearly shows that the execution of the `flow { ... }` body in the `numbers()` function
504stopped after emitting the second number:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300505
506```text
5071
5082
509Finally in numbers
510```
511
512<!--- TEST -->
513
514### Terminal flow operators
515
516Terminal operators on flows are _suspending functions_ that start a collection of the flow.
David.Watsonbb714c52019-08-30 17:49:42 +0200517The [collect] operator is the most basic one, but there are other terminal operators, which can make it easier:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300518
519* Conversion to various collections like [toList] and [toSet].
520* Operators to get the [first] value and to ensure that a flow emits a [single] value.
521* Reducing a flow to a value with [reduce] and [fold].
522
523For example:
524
525<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
526
527```kotlin
528import kotlinx.coroutines.*
529import kotlinx.coroutines.flow.*
530
531fun main() = runBlocking<Unit> {
532//sampleStart
533 val sum = (1..5).asFlow()
534 .map { it * it } // squares of numbers from 1 to 5
535 .reduce { a, b -> a + b } // sum them (terminal operator)
536 println(sum)
537//sampleEnd
538}
539```
540
541</div>
542
David.Watsonbb714c52019-08-30 17:49:42 +0200543> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-11.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300544
545Prints a single number:
546
547```text
54855
549```
550
551<!--- TEST -->
552
553### Flows are sequential
554
555Each individual collection of a flow is performed sequentially unless special operators that operate
556on multiple flows are used. The collection works directly in the coroutine that calls a terminal operator.
557No new coroutines are launched by default.
David.Watsonbb714c52019-08-30 17:49:42 +0200558Each emitted value is processed by all the intermediate operators from
559upstream to downstream and is then delivered to the terminal operator after.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300560
David.Watsonbb714c52019-08-30 17:49:42 +0200561See the following example that filters the even integers and maps them to strings:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300562
563<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
564
565```kotlin
566import kotlinx.coroutines.*
567import kotlinx.coroutines.flow.*
568
569fun main() = runBlocking<Unit> {
570//sampleStart
571 (1..5).asFlow()
572 .filter {
573 println("Filter $it")
574 it % 2 == 0
575 }
576 .map {
577 println("Map $it")
578 "string $it"
579 }.collect {
580 println("Collect $it")
581 }
582//sampleEnd
583}
584```
585
586</div>
587
David.Watsonbb714c52019-08-30 17:49:42 +0200588> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-12.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300589
590Producing:
591
592```text
593Filter 1
594Filter 2
595Map 2
596Collect string 2
597Filter 3
598Filter 4
599Map 4
600Collect string 4
601Filter 5
602```
603
604<!--- TEST -->
605
606### Flow context
607
608Collection of a flow always happens in the context of the calling coroutine. For example, if there is
609a `foo` flow, then the following code runs in the context specified
David.Watsonbb714c52019-08-30 17:49:42 +0200610by the author of this code, regardless of the implementation details of the `foo` flow:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300611
612<div class="sample" markdown="1" theme="idea" data-highlight-only>
613
614```kotlin
615withContext(context) {
616 foo.collect { value ->
617 println(value) // run in the specified context
618 }
619}
620```
621
622</div>
623
624<!--- CLEAR -->
625
626This property of a flow is called _context preservation_.
627
628So, by default, code in the `flow { ... }` builder runs in the context that is provided by a collector
629of the corresponding flow. For example, consider the implementation of `foo` that prints the thread
630it is called on and emits three numbers:
631
632<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
633
634```kotlin
635import kotlinx.coroutines.*
636import kotlinx.coroutines.flow.*
637
638fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
639
640//sampleStart
641fun foo(): Flow<Int> = flow {
642 log("Started foo flow")
643 for (i in 1..3) {
644 emit(i)
645 }
646}
647
648fun main() = runBlocking<Unit> {
649 foo().collect { value -> log("Collected $value") }
650}
651//sampleEnd
652```
653
654</div>
655
David.Watsonbb714c52019-08-30 17:49:42 +0200656> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-13.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300657
658Running this code produces:
659
660```text
661[main @coroutine#1] Started foo flow
662[main @coroutine#1] Collected 1
663[main @coroutine#1] Collected 2
664[main @coroutine#1] Collected 3
665```
666
667<!--- TEST FLEXIBLE_THREAD -->
668
669Since `foo().collect` is called from the main thread, the body of `foo`'s flow is also called in the main thread.
David.Watsonbb714c52019-08-30 17:49:42 +0200670This is the perfect default for fast-running or asynchronous code that does not care about the execution context and
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300671does not block the caller.
672
673#### Wrong emission withContext
674
675However, the long-running CPU-consuming code might need to be executed in the context of [Dispatchers.Default] and UI-updating
676code might need to be executed in the context of [Dispatchers.Main]. Usually, [withContext] is used
David.Watsonbb714c52019-08-30 17:49:42 +0200677to change the context in the code using Kotlin coroutines, but code in the `flow { ... }` builder has to honor the context
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300678preservation property and is not allowed to [emit][FlowCollector.emit] from a different context.
679
680Try running the following code:
681
682<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
683
684```kotlin
685import kotlinx.coroutines.*
686import kotlinx.coroutines.flow.*
687
688//sampleStart
689fun foo(): Flow<Int> = flow {
David.Watsonbb714c52019-08-30 17:49:42 +0200690 // The WRONG way to change context for CPU-consuming code in flow builder
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300691 kotlinx.coroutines.withContext(Dispatchers.Default) {
692 for (i in 1..3) {
693 Thread.sleep(100) // pretend we are computing it in CPU-consuming way
694 emit(i) // emit next value
695 }
696 }
697}
698
699fun main() = runBlocking<Unit> {
700 foo().collect { value -> println(value) }
701}
702//sampleEnd
703```
704
705</div>
706
David.Watsonbb714c52019-08-30 17:49:42 +0200707> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-14.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300708
709This code produces the following exception:
710
Vsevolod Tolstopyatov83943ef2019-10-22 19:26:50 +0300711```text
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300712Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
713 Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
714 but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, DefaultDispatcher].
715 Please refer to 'flow' documentation or use 'flowOn' instead
716 at ...
Vsevolod Tolstopyatov83943ef2019-10-22 19:26:50 +0300717```
718
719<!--- TEST EXCEPTION -->
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300720
721#### flowOn operator
722
David.Watsonbb714c52019-08-30 17:49:42 +0200723The exception refers to the [flowOn] function that shall be used to change the context of the flow emission.
724The correct way to change the context of a flow is shown in the example below, which also prints the
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300725names of the corresponding threads to show how it all works:
726
727<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
728
729```kotlin
730import kotlinx.coroutines.*
731import kotlinx.coroutines.flow.*
732
733fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
734
735//sampleStart
736fun foo(): Flow<Int> = flow {
737 for (i in 1..3) {
738 Thread.sleep(100) // pretend we are computing it in CPU-consuming way
739 log("Emitting $i")
740 emit(i) // emit next value
741 }
742}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
743
744fun main() = runBlocking<Unit> {
745 foo().collect { value ->
746 log("Collected $value")
747 }
748}
749//sampleEnd
750```
751
752</div>
753
David.Watsonbb714c52019-08-30 17:49:42 +0200754> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-15.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300755
756Notice how `flow { ... }` works in the background thread, while collection happens in the main thread:
757
758<!--- TEST FLEXIBLE_THREAD
759[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
760[main @coroutine#1] Collected 1
761[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
762[main @coroutine#1] Collected 2
763[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
764[main @coroutine#1] Collected 3
765-->
766
David.Watsonbb714c52019-08-30 17:49:42 +0200767Another thing to observe here is that the [flowOn] operator has changed the default sequential nature of the flow.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300768Now collection happens in one coroutine ("coroutine#1") and emission happens in another coroutine
David.Watsonbb714c52019-08-30 17:49:42 +0200769("coroutine#2") that is running in another thread concurrently with the collecting coroutine. The [flowOn] operator
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300770creates another coroutine for an upstream flow when it has to change the [CoroutineDispatcher] in its context.
771
772### Buffering
773
David.Watsonbb714c52019-08-30 17:49:42 +0200774Running different parts of a flow in different coroutines can be helpful from the standpoint of the overall time it takes
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300775to collect the flow, especially when long-running asynchronous operations are involved. For example, consider a case when
David.Watsonbb714c52019-08-30 17:49:42 +0200776the emission by `foo()` flow is slow, taking 100 ms to produce an element; and collector is also slow,
777taking 300 ms to process an element. Let's see how long it takes to collect such a flow with three numbers:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300778
779<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
780
781```kotlin
782import kotlinx.coroutines.*
783import kotlinx.coroutines.flow.*
784import kotlin.system.*
785
786//sampleStart
787fun foo(): Flow<Int> = flow {
788 for (i in 1..3) {
789 delay(100) // pretend we are asynchronously waiting 100 ms
790 emit(i) // emit next value
791 }
792}
793
794fun main() = runBlocking<Unit> {
795 val time = measureTimeMillis {
796 foo().collect { value ->
797 delay(300) // pretend we are processing it for 300 ms
798 println(value)
799 }
800 }
801 println("Collected in $time ms")
802}
803//sampleEnd
804```
805
806</div>
807
David.Watsonbb714c52019-08-30 17:49:42 +0200808> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-16.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300809
David.Watsonbb714c52019-08-30 17:49:42 +0200810It produces something like this, with the whole collection taking around 1200 ms (three numbers, 400 ms for each):
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300811
812```text
8131
8142
8153
816Collected in 1220 ms
817```
818
819<!--- TEST ARBITRARY_TIME -->
820
David.Watsonbb714c52019-08-30 17:49:42 +0200821We can use a [buffer] operator on a flow to run emitting code of `foo()` concurrently with collecting code,
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300822as opposed to running them sequentially:
823
824<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
825
826```kotlin
827import kotlinx.coroutines.*
828import kotlinx.coroutines.flow.*
829import kotlin.system.*
830
831fun foo(): Flow<Int> = flow {
832 for (i in 1..3) {
833 delay(100) // pretend we are asynchronously waiting 100 ms
834 emit(i) // emit next value
835 }
836}
837
838fun main() = runBlocking<Unit> {
839//sampleStart
840 val time = measureTimeMillis {
841 foo()
842 .buffer() // buffer emissions, don't wait
843 .collect { value ->
844 delay(300) // pretend we are processing it for 300 ms
845 println(value)
846 }
847 }
848 println("Collected in $time ms")
849//sampleEnd
850}
851```
852
853</div>
854
David.Watsonbb714c52019-08-30 17:49:42 +0200855> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-17.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300856
David.Watsonbb714c52019-08-30 17:49:42 +0200857It produces the same numbers just faster, as we have effectively created a processing pipeline,
858having to only wait 100 ms for the first number and then spending only 300 ms to process
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300859each number. This way it takes around 1000 ms to run:
860
861```text
8621
8632
8643
865Collected in 1071 ms
866```
867
868<!--- TEST ARBITRARY_TIME -->
869
David.Watsonbb714c52019-08-30 17:49:42 +0200870> Note that the [flowOn] operator uses the same buffering mechanism when it has to change a [CoroutineDispatcher],
871but here we explicitly request buffering without changing the execution context.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300872
873#### Conflation
874
David.Watsonbb714c52019-08-30 17:49:42 +0200875When a flow represents partial results of the operation or operation status updates, it may not be necessary
876to process each value, but instead, only most recent ones. In this case, the [conflate] operator can be used to skip
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300877intermediate values when a collector is too slow to process them. Building on the previous example:
878
879<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
880
881```kotlin
882import kotlinx.coroutines.*
883import kotlinx.coroutines.flow.*
884import kotlin.system.*
885
886fun foo(): Flow<Int> = flow {
887 for (i in 1..3) {
888 delay(100) // pretend we are asynchronously waiting 100 ms
889 emit(i) // emit next value
890 }
891}
892
893fun main() = runBlocking<Unit> {
894//sampleStart
895 val time = measureTimeMillis {
896 foo()
897 .conflate() // conflate emissions, don't process each one
898 .collect { value ->
899 delay(300) // pretend we are processing it for 300 ms
900 println(value)
901 }
902 }
903 println("Collected in $time ms")
904//sampleEnd
905}
906```
907
908</div>
909
David.Watsonbb714c52019-08-30 17:49:42 +0200910> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-18.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300911
David.Watsonbb714c52019-08-30 17:49:42 +0200912We see that while the first number was still being processed the second, and third were already produced, so
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300913the second one was _conflated_ and only the most recent (the third one) was delivered to the collector:
914
915```text
9161
9173
918Collected in 758 ms
919```
920
921<!--- TEST ARBITRARY_TIME -->
922
923#### Processing the latest value
924
David.Watsonbb714c52019-08-30 17:49:42 +0200925Conflation is one way to speed up processing when both the emitter and collector are slow. It does it by dropping emitted values.
926The other way is to cancel a slow collector and restart it every time a new value is emitted. There is
927a family of `xxxLatest` operators that perform the same essential logic of a `xxx` operator, but cancel the
928code in their block on a new value. Let's try changing [conflate] to [collectLatest] in the previous example:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300929
930<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
931
932```kotlin
933import kotlinx.coroutines.*
934import kotlinx.coroutines.flow.*
935import kotlin.system.*
936
937fun foo(): Flow<Int> = flow {
938 for (i in 1..3) {
939 delay(100) // pretend we are asynchronously waiting 100 ms
940 emit(i) // emit next value
941 }
942}
943
944fun main() = runBlocking<Unit> {
945//sampleStart
946 val time = measureTimeMillis {
947 foo()
948 .collectLatest { value -> // cancel & restart on the latest value
949 println("Collecting $value")
950 delay(300) // pretend we are processing it for 300 ms
951 println("Done $value")
952 }
953 }
954 println("Collected in $time ms")
955//sampleEnd
956}
957```
958
959</div>
960
David.Watsonbb714c52019-08-30 17:49:42 +0200961> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-19.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300962
963Since the body of [collectLatest] takes 300 ms, but new values are emitted every 100 ms, we see that the block
964is run on every value, but completes only for the last value:
965
966```text
967Collecting 1
968Collecting 2
969Collecting 3
970Done 3
971Collected in 741 ms
972```
973
974<!--- TEST ARBITRARY_TIME -->
975
976### Composing multiple flows
977
David.Watsonbb714c52019-08-30 17:49:42 +0200978There are lots of ways to compose multiple flows.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300979
980#### Zip
981
David.Watsonbb714c52019-08-30 17:49:42 +0200982Just like the [Sequence.zip] extension function in the Kotlin standard library,
983flows have a [zip] operator that combines the corresponding values of two flows:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300984
985<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
986
987```kotlin
988import kotlinx.coroutines.*
989import kotlinx.coroutines.flow.*
990
991fun main() = runBlocking<Unit> {
992//sampleStart
993 val nums = (1..3).asFlow() // numbers 1..3
994 val strs = flowOf("one", "two", "three") // strings
995 nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
996 .collect { println(it) } // collect and print
997//sampleEnd
998}
999```
1000
1001</div>
1002
David.Watsonbb714c52019-08-30 17:49:42 +02001003> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-20.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001004
1005This example prints:
1006
1007```text
10081 -> one
10092 -> two
10103 -> three
1011```
1012
1013<!--- TEST -->
1014
1015#### Combine
1016
David.Watsonbb714c52019-08-30 17:49:42 +02001017When flow represents the most recent value of a variable or operation (see also the related
1018section on [conflation](#conflation)), it might be needed to perform a computation that depends on
1019the most recent values of the corresponding flows and to recompute it whenever any of the upstream
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001020flows emit a value. The corresponding family of operators is called [combine].
1021
1022For example, if the numbers in the previous example update every 300ms, but strings update every 400 ms,
David.Watsonbb714c52019-08-30 17:49:42 +02001023then zipping them using the [zip] operator will still produce the same result,
1024albeit results that are printed every 400 ms:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001025
David.Watsonbb714c52019-08-30 17:49:42 +02001026> We use a [onEach] intermediate operator in this example to delay each element and make the code
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001027that emits sample flows more declarative and shorter.
1028
1029<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1030
1031```kotlin
1032import kotlinx.coroutines.*
1033import kotlinx.coroutines.flow.*
1034
1035fun main() = runBlocking<Unit> {
1036//sampleStart
1037 val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
1038 val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
1039 val startTime = System.currentTimeMillis() // remember the start time
1040 nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
1041 .collect { value -> // collect and print
1042 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1043 }
1044//sampleEnd
1045}
1046```
1047
1048</div>
1049
David.Watsonbb714c52019-08-30 17:49:42 +02001050> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-21.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001051
1052<!--- TEST ARBITRARY_TIME
10531 -> one at 437 ms from start
10542 -> two at 837 ms from start
10553 -> three at 1243 ms from start
1056-->
1057
David.Watsonbb714c52019-08-30 17:49:42 +02001058However, when using a [combine] operator here instead of a [zip]:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001059
1060<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1061
1062```kotlin
1063import kotlinx.coroutines.*
1064import kotlinx.coroutines.flow.*
1065
1066fun main() = runBlocking<Unit> {
1067//sampleStart
1068 val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
1069 val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
Roman Elizarova73862f2019-09-02 17:31:14 +03001070 val startTime = System.currentTimeMillis() // remember the start time
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001071 nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
1072 .collect { value -> // collect and print
1073 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1074 }
1075//sampleEnd
1076}
1077```
1078
1079</div>
1080
David.Watsonbb714c52019-08-30 17:49:42 +02001081> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-22.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001082
1083We get quite a different output, where a line is printed at each emission from either `nums` or `strs` flows:
1084
1085```text
10861 -> one at 452 ms from start
10872 -> one at 651 ms from start
10882 -> two at 854 ms from start
10893 -> two at 952 ms from start
10903 -> three at 1256 ms from start
1091```
1092
1093<!--- TEST ARBITRARY_TIME -->
1094
1095### Flattening flows
1096
1097Flows represent asynchronously received sequences of values, so it is quite easy to get in a situation where
1098each value triggers a request for another sequence of values. For example, we can have the following
1099function that returns a flow of two strings 500 ms apart:
1100
1101<div class="sample" markdown="1" theme="idea" data-highlight-only>
1102
1103```kotlin
1104fun requestFlow(i: Int): Flow<String> = flow {
1105 emit("$i: First")
1106 delay(500) // wait 500 ms
1107 emit("$i: Second")
1108}
1109```
1110
1111</div>
1112
1113<!--- CLEAR -->
1114
1115Now if we have a flow of three integers and call `requestFlow` for each of them like this:
1116
1117<div class="sample" markdown="1" theme="idea" data-highlight-only>
1118
1119```kotlin
1120(1..3).asFlow().map { requestFlow(it) }
1121```
1122
1123</div>
1124
1125<!--- CLEAR -->
1126
1127Then we end up with a flow of flows (`Flow<Flow<String>>`) that needs to be _flattened_ into a single flow for
1128further processing. Collections and sequences have [flatten][Sequence.flatten] and [flatMap][Sequence.flatMap]
David.Watsonbb714c52019-08-30 17:49:42 +02001129operators for this. However, due the asynchronous nature of flows they call for different _modes_ of flattening,
1130as such, there is a family of flattening operators on flows.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001131
1132#### flatMapConcat
1133
1134Concatenating mode is implemented by [flatMapConcat] and [flattenConcat] operators. They are the most direct
David.Watsonbb714c52019-08-30 17:49:42 +02001135analogues of the corresponding sequence operators. They wait for the inner flow to complete before
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001136starting to collect the next one as the following example shows:
1137
1138<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1139
1140```kotlin
1141import kotlinx.coroutines.*
1142import kotlinx.coroutines.flow.*
1143
1144fun requestFlow(i: Int): Flow<String> = flow {
1145 emit("$i: First")
1146 delay(500) // wait 500 ms
1147 emit("$i: Second")
1148}
1149
1150fun main() = runBlocking<Unit> {
1151//sampleStart
Roman Elizarova73862f2019-09-02 17:31:14 +03001152 val startTime = System.currentTimeMillis() // remember the start time
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001153 (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
1154 .flatMapConcat { requestFlow(it) }
1155 .collect { value -> // collect and print
1156 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1157 }
1158//sampleEnd
1159}
1160```
1161
1162</div>
1163
David.Watsonbb714c52019-08-30 17:49:42 +02001164> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-23.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001165
1166The sequential nature of [flatMapConcat] is clearly seen in the output:
1167
1168```text
11691: First at 121 ms from start
11701: Second at 622 ms from start
11712: First at 727 ms from start
11722: Second at 1227 ms from start
11733: First at 1328 ms from start
11743: Second at 1829 ms from start
1175```
1176
1177<!--- TEST ARBITRARY_TIME -->
1178
1179#### flatMapMerge
1180
1181Another flattening mode is to concurrently collect all the incoming flows and merge their values into
1182a single flow so that values are emitted as soon as possible.
1183It is implemented by [flatMapMerge] and [flattenMerge] operators. They both accept an optional
1184`concurrency` parameter that limits the number of concurrent flows that are collected at the same time
1185(it is equal to [DEFAULT_CONCURRENCY] by default).
1186
1187<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1188
1189```kotlin
1190import kotlinx.coroutines.*
1191import kotlinx.coroutines.flow.*
1192
1193fun requestFlow(i: Int): Flow<String> = flow {
1194 emit("$i: First")
1195 delay(500) // wait 500 ms
1196 emit("$i: Second")
1197}
1198
1199fun main() = runBlocking<Unit> {
1200//sampleStart
Roman Elizarova73862f2019-09-02 17:31:14 +03001201 val startTime = System.currentTimeMillis() // remember the start time
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001202 (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
1203 .flatMapMerge { requestFlow(it) }
1204 .collect { value -> // collect and print
1205 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1206 }
1207//sampleEnd
1208}
1209```
1210
1211</div>
1212
David.Watsonbb714c52019-08-30 17:49:42 +02001213> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-24.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001214
1215The concurrent nature of [flatMapMerge] is obvious:
1216
1217```text
12181: First at 136 ms from start
12192: First at 231 ms from start
12203: First at 333 ms from start
12211: Second at 639 ms from start
12222: Second at 732 ms from start
12233: Second at 833 ms from start
1224```
1225
1226<!--- TEST ARBITRARY_TIME -->
1227
David.Watsonbb714c52019-08-30 17:49:42 +02001228> Note that the [flatMapMerge] calls its block of code (`{ requestFlow(it) }` in this example) sequentially, but
1229collects the resulting flows concurrently, it is the equivalent of performing a sequential
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001230`map { requestFlow(it) }` first and then calling [flattenMerge] on the result.
1231
1232#### flatMapLatest
1233
David.Watsonbb714c52019-08-30 17:49:42 +02001234In a similar way to the [collectLatest] operator, that was shown in
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001235["Processing the latest value"](#processing-the-latest-value) section, there is the corresponding "Latest"
David.Watsonbb714c52019-08-30 17:49:42 +02001236flattening mode where a collection of the previous flow is cancelled as soon as new flow is emitted.
1237It is implemented by the [flatMapLatest] operator.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001238
1239<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1240
1241```kotlin
1242import kotlinx.coroutines.*
1243import kotlinx.coroutines.flow.*
1244
1245fun requestFlow(i: Int): Flow<String> = flow {
1246 emit("$i: First")
1247 delay(500) // wait 500 ms
1248 emit("$i: Second")
1249}
1250
1251fun main() = runBlocking<Unit> {
1252//sampleStart
Roman Elizarova73862f2019-09-02 17:31:14 +03001253 val startTime = System.currentTimeMillis() // remember the start time
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001254 (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
1255 .flatMapLatest { requestFlow(it) }
1256 .collect { value -> // collect and print
1257 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1258 }
1259//sampleEnd
1260}
1261```
1262
1263</div>
1264
David.Watsonbb714c52019-08-30 17:49:42 +02001265> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-25.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001266
David.Watsonbb714c52019-08-30 17:49:42 +02001267The output here in this example is a good demonstration of how [flatMapLatest] works:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001268
1269```text
12701: First at 142 ms from start
12712: First at 322 ms from start
12723: First at 425 ms from start
12733: Second at 931 ms from start
1274```
1275
1276<!--- TEST ARBITRARY_TIME -->
1277
1278> Note that [flatMapLatest] cancels all the code in its block (`{ requestFlow(it) }` in this example) on a new value.
1279It makes no difference in this particular example, because the call to `requestFlow` itself is fast, not-suspending,
1280and cannot be cancelled. However, it would show up if we were to use suspending functions like `delay` in there.
1281
1282### Flow exceptions
1283
David.Watsonbb714c52019-08-30 17:49:42 +02001284Flow collection can complete with an exception when an emitter or code inside the operators throw an exception.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001285There are several ways to handle these exceptions.
1286
1287#### Collector try and catch
1288
1289A collector can use Kotlin's [`try/catch`][exceptions] block to handle exceptions:
1290
1291<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1292
1293```kotlin
1294import kotlinx.coroutines.*
1295import kotlinx.coroutines.flow.*
1296
1297//sampleStart
1298fun foo(): Flow<Int> = flow {
1299 for (i in 1..3) {
1300 println("Emitting $i")
1301 emit(i) // emit next value
1302 }
1303}
1304
1305fun main() = runBlocking<Unit> {
1306 try {
1307 foo().collect { value ->
1308 println(value)
1309 check(value <= 1) { "Collected $value" }
1310 }
1311 } catch (e: Throwable) {
1312 println("Caught $e")
1313 }
1314}
1315//sampleEnd
1316```
1317
1318</div>
1319
David.Watsonbb714c52019-08-30 17:49:42 +02001320> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-26.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001321
1322This code successfully catches an exception in [collect] terminal operator and,
David.Watsonbb714c52019-08-30 17:49:42 +02001323as we see, no more values are emitted after that:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001324
1325```text
1326Emitting 1
13271
1328Emitting 2
13292
1330Caught java.lang.IllegalStateException: Collected 2
1331```
1332
1333<!--- TEST -->
1334
1335#### Everything is caught
1336
David.Watsonbb714c52019-08-30 17:49:42 +02001337The previous example actually catches any exception happening in the emitter or in any intermediate or terminal operators.
1338For example, let's change the code so that emitted values are [mapped][map] to strings,
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001339but the corresponding code produces an exception:
1340
1341<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1342
1343```kotlin
1344import kotlinx.coroutines.*
1345import kotlinx.coroutines.flow.*
1346
1347//sampleStart
1348fun foo(): Flow<String> =
1349 flow {
1350 for (i in 1..3) {
1351 println("Emitting $i")
1352 emit(i) // emit next value
1353 }
1354 }
1355 .map { value ->
1356 check(value <= 1) { "Crashed on $value" }
1357 "string $value"
1358 }
1359
1360fun main() = runBlocking<Unit> {
1361 try {
1362 foo().collect { value -> println(value) }
1363 } catch (e: Throwable) {
1364 println("Caught $e")
1365 }
1366}
1367//sampleEnd
1368```
1369
1370</div>
1371
David.Watsonbb714c52019-08-30 17:49:42 +02001372> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-27.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001373
1374This exception is still caught and collection is stopped:
1375
1376```text
1377Emitting 1
1378string 1
1379Emitting 2
1380Caught java.lang.IllegalStateException: Crashed on 2
1381```
1382
1383<!--- TEST -->
1384
1385### Exception transparency
1386
David.Watsonbb714c52019-08-30 17:49:42 +02001387But how can code of the emitter encapsulate its exception handling behavior?
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001388
David.Watsonbb714c52019-08-30 17:49:42 +02001389Flows must be _transparent to exceptions_ and it is a violation of the exception transparency to [emit][FlowCollector.emit] values in the
1390`flow { ... }` builder from inside of a `try/catch` block. This guarantees that a collector throwing an exception
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001391can always catch it using `try/catch` as in the previous example.
1392
David.Watsonbb714c52019-08-30 17:49:42 +02001393The emitter can use a [catch] operator that preserves this exception transparency and allows encapsulation
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001394of its exception handling. The body of the `catch` operator can analyze an exception
1395and react to it in different ways depending on which exception was caught:
1396
1397* Exceptions can be rethrown using `throw`.
1398* Exceptions can be turned into emission of values using [emit][FlowCollector.emit] from the body of [catch].
1399* Exceptions can be ignored, logged, or processed by some other code.
1400
David.Watsonbb714c52019-08-30 17:49:42 +02001401For example, let us emit the text on catching an exception:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001402
1403<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1404
1405```kotlin
1406import kotlinx.coroutines.*
1407import kotlinx.coroutines.flow.*
1408
1409fun foo(): Flow<String> =
1410 flow {
1411 for (i in 1..3) {
1412 println("Emitting $i")
1413 emit(i) // emit next value
1414 }
1415 }
1416 .map { value ->
1417 check(value <= 1) { "Crashed on $value" }
1418 "string $value"
1419 }
1420
1421fun main() = runBlocking<Unit> {
1422//sampleStart
1423 foo()
1424 .catch { e -> emit("Caught $e") } // emit on exception
1425 .collect { value -> println(value) }
1426//sampleEnd
1427}
1428```
1429
1430</div>
1431
David.Watsonbb714c52019-08-30 17:49:42 +02001432> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-28.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001433
1434The output of the example is the same, even though we do not have `try/catch` around the code anymore.
1435
1436<!--- TEST
1437Emitting 1
1438string 1
1439Emitting 2
1440Caught java.lang.IllegalStateException: Crashed on 2
1441-->
1442
1443#### Transparent catch
1444
1445The [catch] intermediate operator, honoring exception transparency, catches only upstream exceptions
1446(that is an exception from all the operators above `catch`, but not below it).
1447If the block in `collect { ... }` (placed below `catch`) throws an exception then it escapes:
1448
1449<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1450
1451```kotlin
1452import kotlinx.coroutines.*
1453import kotlinx.coroutines.flow.*
1454
1455//sampleStart
1456fun foo(): Flow<Int> = flow {
1457 for (i in 1..3) {
1458 println("Emitting $i")
1459 emit(i)
1460 }
1461}
1462
1463fun main() = runBlocking<Unit> {
1464 foo()
1465 .catch { e -> println("Caught $e") } // does not catch downstream exceptions
1466 .collect { value ->
1467 check(value <= 1) { "Collected $value" }
1468 println(value)
1469 }
1470}
1471//sampleEnd
1472```
1473
1474</div>
1475
David.Watsonbb714c52019-08-30 17:49:42 +02001476> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-29.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001477
David.Watsonbb714c52019-08-30 17:49:42 +02001478A "Caught ..." message is not printed despite there being a `catch` operator:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001479
1480<!--- TEST EXCEPTION
1481Emitting 1
14821
1483Emitting 2
1484Exception in thread "main" java.lang.IllegalStateException: Collected 2
1485 at ...
1486-->
1487
1488#### Catching declaratively
1489
David.Watsonbb714c52019-08-30 17:49:42 +02001490We can combine the declarative nature of the [catch] operator with a desire to handle all the exceptions, by moving the body
1491of the [collect] operator into [onEach] and putting it before the `catch` operator. Collection of this flow must
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001492be triggered by a call to `collect()` without parameters:
1493
1494<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1495
1496```kotlin
1497import kotlinx.coroutines.*
1498import kotlinx.coroutines.flow.*
1499
1500fun foo(): Flow<Int> = flow {
1501 for (i in 1..3) {
1502 println("Emitting $i")
1503 emit(i)
1504 }
1505}
1506
1507fun main() = runBlocking<Unit> {
1508//sampleStart
1509 foo()
1510 .onEach { value ->
1511 check(value <= 1) { "Collected $value" }
1512 println(value)
1513 }
1514 .catch { e -> println("Caught $e") }
1515 .collect()
1516//sampleEnd
1517}
1518```
1519
1520</div>
1521
David.Watsonbb714c52019-08-30 17:49:42 +02001522> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-30.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001523
David.Watsonbb714c52019-08-30 17:49:42 +02001524Now we can see that a "Caught ..." message is printed and so we can catch all the exceptions without explicitly
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001525using a `try/catch` block:
1526
1527<!--- TEST EXCEPTION
1528Emitting 1
15291
1530Emitting 2
1531Caught java.lang.IllegalStateException: Collected 2
1532-->
1533
1534### Flow completion
1535
David.Watsonbb714c52019-08-30 17:49:42 +02001536When flow collection completes (normally or exceptionally) it may need to execute an action.
1537As you may have already noticed, it can be done in two ways: imperative or declarative.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001538
1539#### Imperative finally block
1540
David.Watsonbb714c52019-08-30 17:49:42 +02001541In addition to `try`/`catch`, a collector can also use a `finally` block to execute an action
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001542upon `collect` completion.
1543
1544<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1545
1546```kotlin
1547import kotlinx.coroutines.*
1548import kotlinx.coroutines.flow.*
1549
1550//sampleStart
1551fun foo(): Flow<Int> = (1..3).asFlow()
1552
1553fun main() = runBlocking<Unit> {
1554 try {
1555 foo().collect { value -> println(value) }
1556 } finally {
1557 println("Done")
1558 }
1559}
1560//sampleEnd
1561```
1562
1563</div>
1564
David.Watsonbb714c52019-08-30 17:49:42 +02001565> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-31.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001566
David.Watsonbb714c52019-08-30 17:49:42 +02001567This code prints three numbers produced by the `foo()` flow followed by a "Done" string:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001568
1569```text
15701
15712
15723
1573Done
1574```
1575
1576<!--- TEST -->
1577
1578#### Declarative handling
1579
David.Watsonbb714c52019-08-30 17:49:42 +02001580For the declarative approach, flow has [onCompletion] intermediate operator that is invoked
1581when the flow has completely collected.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001582
David.Watsonbb714c52019-08-30 17:49:42 +02001583The previous example can be rewritten using an [onCompletion] operator and produces the same output:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001584
1585<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1586
1587```kotlin
1588import kotlinx.coroutines.*
1589import kotlinx.coroutines.flow.*
1590
1591fun foo(): Flow<Int> = (1..3).asFlow()
1592
1593fun main() = runBlocking<Unit> {
1594//sampleStart
1595 foo()
1596 .onCompletion { println("Done") }
1597 .collect { value -> println(value) }
1598//sampleEnd
1599}
1600```
1601</div>
1602
David.Watsonbb714c52019-08-30 17:49:42 +02001603> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-32.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001604
1605<!--- TEST
16061
16072
16083
1609Done
1610-->
1611
1612The key advantage of [onCompletion] is a nullable `Throwable` parameter of the lambda that can be used
David.Watsonbb714c52019-08-30 17:49:42 +02001613to determine whether the flow collection was completed normally or exceptionally. In the following
1614example the `foo()` flow throws an exception after emitting the number 1:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001615
1616<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1617
1618```kotlin
1619import kotlinx.coroutines.*
1620import kotlinx.coroutines.flow.*
1621
1622//sampleStart
1623fun foo(): Flow<Int> = flow {
1624 emit(1)
1625 throw RuntimeException()
1626}
1627
1628fun main() = runBlocking<Unit> {
1629 foo()
1630 .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
1631 .catch { cause -> println("Caught exception") }
1632 .collect { value -> println(value) }
1633}
1634//sampleEnd
1635```
1636</div>
1637
David.Watsonbb714c52019-08-30 17:49:42 +02001638> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-33.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001639
1640As you may expect, it prints:
1641
1642```text
16431
1644Flow completed exceptionally
1645Caught exception
1646```
1647
1648<!--- TEST -->
1649
David.Watsonbb714c52019-08-30 17:49:42 +02001650The [onCompletion] operator, unlike [catch], does not handle the exception. As we can see from the above
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001651example code, the exception still flows downstream. It will be delivered to further `onCompletion` operators
David.Watsonbb714c52019-08-30 17:49:42 +02001652and can be handled with a `catch` operator.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001653
1654#### Upstream exceptions only
1655
David.Watsonbb714c52019-08-30 17:49:42 +02001656Just like the [catch] operator, [onCompletion] only sees exceptions coming from upstream and does not
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001657see downstream exceptions. For example, run the following code:
1658
1659<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1660
1661```kotlin
1662import kotlinx.coroutines.*
1663import kotlinx.coroutines.flow.*
1664
1665//sampleStart
1666fun foo(): Flow<Int> = (1..3).asFlow()
1667
1668fun main() = runBlocking<Unit> {
1669 foo()
1670 .onCompletion { cause -> println("Flow completed with $cause") }
1671 .collect { value ->
1672 check(value <= 1) { "Collected $value" }
1673 println(value)
1674 }
1675}
1676//sampleEnd
1677```
1678
1679</div>
1680
David.Watsonbb714c52019-08-30 17:49:42 +02001681> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-34.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001682
David.Watsonbb714c52019-08-30 17:49:42 +02001683We can see the completion cause is null, yet collection failed with exception:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001684
1685```text
16861
1687Flow completed with null
1688Exception in thread "main" java.lang.IllegalStateException: Collected 2
1689```
1690
1691<!--- TEST EXCEPTION -->
1692
1693### Imperative versus declarative
1694
David.Watsonbb714c52019-08-30 17:49:42 +02001695Now we know how to collect flow, and handle its completion and exceptions in both imperative and declarative ways.
1696The natural question here is, which approach is preferred and why?
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001697As a library, we do not advocate for any particular approach and believe that both options
1698are valid and should be selected according to your own preferences and code style.
1699
1700### Launching flow
1701
David.Watsonbb714c52019-08-30 17:49:42 +02001702It is easy to use flows to represent asynchronous events that are coming from some source.
1703In this case, we need an analogue of the `addEventListener` function that registers a piece of code with a reaction
1704for incoming events and continues further work. The [onEach] operator can serve this role.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001705However, `onEach` is an intermediate operator. We also need a terminal operator to collect the flow.
1706Otherwise, just calling `onEach` has no effect.
1707
David.Watsonbb714c52019-08-30 17:49:42 +02001708If we use the [collect] terminal operator after `onEach`, then the code after it will wait until the flow is collected:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001709
1710<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1711
1712```kotlin
1713import kotlinx.coroutines.*
1714import kotlinx.coroutines.flow.*
1715
1716//sampleStart
1717// Imitate a flow of events
1718fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
1719
1720fun main() = runBlocking<Unit> {
1721 events()
1722 .onEach { event -> println("Event: $event") }
1723 .collect() // <--- Collecting the flow waits
1724 println("Done")
1725}
1726//sampleEnd
1727```
1728
1729</div>
1730
David.Watsonbb714c52019-08-30 17:49:42 +02001731> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-35.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001732
1733As you can see, it prints:
1734
1735```text
1736Event: 1
1737Event: 2
1738Event: 3
1739Done
1740```
1741
1742<!--- TEST -->
1743
David.Watsonbb714c52019-08-30 17:49:42 +02001744The [launchIn] terminal operator comes in handy here. By replacing `collect` with `launchIn` we can
1745launch a collection of the flow in a separate coroutine, so that execution of further code
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001746immediately continues:
1747
1748<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1749
1750```kotlin
1751import kotlinx.coroutines.*
1752import kotlinx.coroutines.flow.*
1753
1754// Imitate a flow of events
1755fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
1756
1757//sampleStart
1758fun main() = runBlocking<Unit> {
1759 events()
1760 .onEach { event -> println("Event: $event") }
1761 .launchIn(this) // <--- Launching the flow in a separate coroutine
1762 println("Done")
1763}
1764//sampleEnd
1765```
1766
1767</div>
1768
David.Watsonbb714c52019-08-30 17:49:42 +02001769> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-36.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001770
1771It prints:
1772
1773```text
1774Done
1775Event: 1
1776Event: 2
1777Event: 3
1778```
1779
1780<!--- TEST -->
1781
1782The required parameter to `launchIn` must specify a [CoroutineScope] in which the coroutine to collect the flow is
David.Watsonbb714c52019-08-30 17:49:42 +02001783launched. In the above example this scope comes from the [runBlocking]
1784coroutine builder, so while the flow is running, this [runBlocking] scope waits for completion of its child coroutine
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001785and keeps the main function from returning and terminating this example.
1786
David.Watsonbb714c52019-08-30 17:49:42 +02001787In actual applications a scope will come from an entity with a limited
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001788lifetime. As soon as the lifetime of this entity is terminated the corresponding scope is cancelled, cancelling
David.Watsonbb714c52019-08-30 17:49:42 +02001789the collection of the corresponding flow. This way the pair of `onEach { ... }.launchIn(scope)` works
1790like the `addEventListener`. However, there is no need for the corresponding `removeEventListener` function,
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001791as cancellation and structured concurrency serve this purpose.
1792
David.Watsonbb714c52019-08-30 17:49:42 +02001793Note that [launchIn] also returns a [Job], which can be used to [cancel][Job.cancel] the corresponding flow collection
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001794coroutine only without cancelling the whole scope or to [join][Job.join] it.
1795
Vsevolod Tolstopyatovc99704a2019-09-24 19:30:49 +03001796### Flow and Reactive Streams
1797
1798For those who are familiar with [Reactive Streams](https://www.reactive-streams.org/) or reactive frameworks such as RxJava and project Reactor,
1799design of the Flow may look very familiar.
1800
1801Indeed, its design was inspired by Reactive Streams and its various implementations. But Flow main goal is to have as simple design as possible,
1802be Kotlin and suspension friendly and respect structured concurrency. Achieving this goal would be impossible without reactive pioneers and their tremendous work. You can read the complete story in [Reactive Streams and Kotlin Flows](https://medium.com/@elizarov/reactive-streams-and-kotlin-flows-bfd12772cda4) article.
1803
1804While being different, conceptually, Flow *is* a reactive stream and it is possible to convert it to the reactive (spec and TCK compliant) Publisher and vice versa.
1805Such converters are provided by `kotlinx.coroutines` out-of-the-box and can be found in corresponding reactive modules (`kotlinx-coroutines-reactive` for Reactive Streams, `kotlinx-coroutines-reactor` for Project Reactor and `kotlinx-coroutines-rx2` for RxJava2).
1806Integration modules include conversions from and to `Flow`, integration with Reactor's `Context` and suspension-friendly ways to work with various reactive entities.
1807
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001808<!-- stdlib references -->
1809
1810[collections]: https://kotlinlang.org/docs/reference/collections-overview.html
1811[List]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/-list/index.html
1812[forEach]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/for-each.html
1813[Sequence]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/index.html
1814[Sequence.zip]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/zip.html
1815[Sequence.flatten]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/flatten.html
1816[Sequence.flatMap]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/flat-map.html
1817[exceptions]: https://kotlinlang.org/docs/reference/exceptions.html
1818
1819<!--- MODULE kotlinx-coroutines-core -->
1820<!--- INDEX kotlinx.coroutines -->
1821[delay]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/delay.html
1822[withTimeoutOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-timeout-or-null.html
1823[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
1824[Dispatchers.Main]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-main.html
1825[withContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-context.html
1826[CoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-dispatcher/index.html
1827[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html
1828[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
1829[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/index.html
1830[Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/cancel.html
1831[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html
1832<!--- INDEX kotlinx.coroutines.flow -->
1833[Flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html
1834[flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow.html
1835[FlowCollector.emit]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow-collector/emit.html
1836[collect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect.html
1837[flowOf]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-of.html
1838[map]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/map.html
1839[filter]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/filter.html
1840[transform]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/transform.html
1841[take]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/take.html
1842[toList]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-list.html
1843[toSet]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-set.html
1844[first]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/first.html
1845[single]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/single.html
1846[reduce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/reduce.html
1847[fold]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/fold.html
1848[flowOn]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-on.html
1849[buffer]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/buffer.html
1850[conflate]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/conflate.html
1851[collectLatest]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect-latest.html
1852[zip]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/zip.html
1853[combine]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/combine.html
1854[onEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-each.html
1855[flatMapConcat]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-concat.html
1856[flattenConcat]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-concat.html
1857[flatMapMerge]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-merge.html
1858[flattenMerge]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-merge.html
1859[DEFAULT_CONCURRENCY]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-d-e-f-a-u-l-t_-c-o-n-c-u-r-r-e-n-c-y.html
1860[flatMapLatest]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-latest.html
1861[catch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/catch.html
1862[onCompletion]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-completion.html
1863[launchIn]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/launch-in.html
1864<!--- END -->