blob: 37e491cf51f47cf6f453c90b503814a7a24c11b9 [file] [log] [blame] [view]
Roman Elizarov660c2d72020-02-14 13:18:37 +03001<!--- TEST_NAME FlowGuideTest -->
Roman Elizarov3258e1f2019-08-22 20:08:48 +03002
3**Table of contents**
4
5<!--- TOC -->
6
7* [Asynchronous Flow](#asynchronous-flow)
8 * [Representing multiple values](#representing-multiple-values)
9 * [Sequences](#sequences)
10 * [Suspending functions](#suspending-functions)
11 * [Flows](#flows)
12 * [Flows are cold](#flows-are-cold)
13 * [Flow cancellation](#flow-cancellation)
14 * [Flow builders](#flow-builders)
15 * [Intermediate flow operators](#intermediate-flow-operators)
16 * [Transform operator](#transform-operator)
17 * [Size-limiting operators](#size-limiting-operators)
18 * [Terminal flow operators](#terminal-flow-operators)
19 * [Flows are sequential](#flows-are-sequential)
20 * [Flow context](#flow-context)
21 * [Wrong emission withContext](#wrong-emission-withcontext)
22 * [flowOn operator](#flowon-operator)
23 * [Buffering](#buffering)
24 * [Conflation](#conflation)
25 * [Processing the latest value](#processing-the-latest-value)
26 * [Composing multiple flows](#composing-multiple-flows)
27 * [Zip](#zip)
28 * [Combine](#combine)
29 * [Flattening flows](#flattening-flows)
30 * [flatMapConcat](#flatmapconcat)
31 * [flatMapMerge](#flatmapmerge)
32 * [flatMapLatest](#flatmaplatest)
33 * [Flow exceptions](#flow-exceptions)
34 * [Collector try and catch](#collector-try-and-catch)
35 * [Everything is caught](#everything-is-caught)
36 * [Exception transparency](#exception-transparency)
37 * [Transparent catch](#transparent-catch)
38 * [Catching declaratively](#catching-declaratively)
39 * [Flow completion](#flow-completion)
40 * [Imperative finally block](#imperative-finally-block)
41 * [Declarative handling](#declarative-handling)
42 * [Upstream exceptions only](#upstream-exceptions-only)
43 * [Imperative versus declarative](#imperative-versus-declarative)
44 * [Launching flow](#launching-flow)
Vsevolod Tolstopyatovc99704a2019-09-24 19:30:49 +030045 * [Flow and Reactive Streams](#flow-and-reactive-streams)
Roman Elizarov3258e1f2019-08-22 20:08:48 +030046
Roman Elizarov660c2d72020-02-14 13:18:37 +030047<!--- END -->
Roman Elizarov3258e1f2019-08-22 20:08:48 +030048
49## Asynchronous Flow
50
David.Watsonbb714c52019-08-30 17:49:42 +020051Suspending functions asynchronously returns a single value, but how can we return
52multiple asynchronously computed values? This is where Kotlin Flows come in.
Roman Elizarov3258e1f2019-08-22 20:08:48 +030053
54### Representing multiple values
55
56Multiple values can be represented in Kotlin using [collections].
57For example, we can have a function `foo()` that returns a [List]
David.Watsonbb714c52019-08-30 17:49:42 +020058of three numbers and then print them all using [forEach]:
Roman Elizarov3258e1f2019-08-22 20:08:48 +030059
60<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
61
62```kotlin
63fun foo(): List<Int> = listOf(1, 2, 3)
64
65fun main() {
66 foo().forEach { value -> println(value) }
67}
68```
69
70</div>
71
David.Watsonbb714c52019-08-30 17:49:42 +020072> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-01.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +030073
74This code outputs:
75
76```text
771
782
793
80```
81
82<!--- TEST -->
83
84#### Sequences
85
David.Watsonbb714c52019-08-30 17:49:42 +020086If we are computing the numbers with some CPU-consuming blocking code
87(each computation taking 100ms), then we can represent the numbers using a [Sequence]:
Roman Elizarov3258e1f2019-08-22 20:08:48 +030088
89<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
90
91```kotlin
92fun foo(): Sequence<Int> = sequence { // sequence builder
93 for (i in 1..3) {
94 Thread.sleep(100) // pretend we are computing it
95 yield(i) // yield next value
96 }
97}
98
99fun main() {
100 foo().forEach { value -> println(value) }
101}
102```
103
104</div>
105
David.Watsonbb714c52019-08-30 17:49:42 +0200106> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-02.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300107
108This code outputs the same numbers, but it waits 100ms before printing each one.
109
110<!--- TEST
1111
1122
1133
114-->
115
116#### Suspending functions
117
118However, this computation blocks the main thread that is running the code.
David.Watsonbb714c52019-08-30 17:49:42 +0200119When these values are computed by asynchronous code we can mark the function `foo` with a `suspend` modifier,
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300120so that it can perform its work without blocking and return the result as a list:
121
122<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
123
124```kotlin
125import kotlinx.coroutines.*
126
127//sampleStart
128suspend fun foo(): List<Int> {
129 delay(1000) // pretend we are doing something asynchronous here
130 return listOf(1, 2, 3)
131}
132
133fun main() = runBlocking<Unit> {
134 foo().forEach { value -> println(value) }
135}
136//sampleEnd
137```
138
139</div>
140
David.Watsonbb714c52019-08-30 17:49:42 +0200141> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-03.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300142
143This code prints the numbers after waiting for a second.
144
145<!--- TEST
1461
1472
1483
149-->
150
151#### Flows
152
David.Watsonbb714c52019-08-30 17:49:42 +0200153Using the `List<Int>` result type, means we can only return all the values at once. To represent
154the stream of values that are being asynchronously computed, we can use a [`Flow<Int>`][Flow] type just like we would the `Sequence<Int>` type for synchronously computed values:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300155
156<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
157
158```kotlin
159import kotlinx.coroutines.*
160import kotlinx.coroutines.flow.*
161
162//sampleStart
163fun foo(): Flow<Int> = flow { // flow builder
164 for (i in 1..3) {
165 delay(100) // pretend we are doing something useful here
166 emit(i) // emit next value
167 }
168}
169
170fun main() = runBlocking<Unit> {
David.Watsonbb714c52019-08-30 17:49:42 +0200171 // Launch a concurrent coroutine to check if the main thread is blocked
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300172 launch {
173 for (k in 1..3) {
174 println("I'm not blocked $k")
175 delay(100)
176 }
177 }
178 // Collect the flow
179 foo().collect { value -> println(value) }
180}
181//sampleEnd
182```
183
184</div>
185
David.Watsonbb714c52019-08-30 17:49:42 +0200186> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-04.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300187
188This code waits 100ms before printing each number without blocking the main thread. This is verified
189by printing "I'm not blocked" every 100ms from a separate coroutine that is running in the main thread:
190
191```text
192I'm not blocked 1
1931
194I'm not blocked 2
1952
196I'm not blocked 3
1973
198```
199
200<!--- TEST -->
201
David.Watsonbb714c52019-08-30 17:49:42 +0200202Notice the following differences in the code with the [Flow] from the earlier examples:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300203
204* A builder function for [Flow] type is called [flow].
205* Code inside the `flow { ... }` builder block can suspend.
206* The function `foo()` is no longer marked with `suspend` modifier.
207* Values are _emitted_ from the flow using [emit][FlowCollector.emit] function.
208* Values are _collected_ from the flow using [collect][collect] function.
209
David.Watsonbb714c52019-08-30 17:49:42 +0200210> We can replace [delay] with `Thread.sleep` in the body of `foo`'s `flow { ... }` and see that the main
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300211thread is blocked in this case.
212
213### Flows are cold
214
David.Watsonbb714c52019-08-30 17:49:42 +0200215Flows are _cold_ streams similar to sequences &mdash; the code inside a [flow] builder does not
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300216run until the flow is collected. This becomes clear in the following example:
217
218<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
219
220```kotlin
221import kotlinx.coroutines.*
222import kotlinx.coroutines.flow.*
223
224//sampleStart
225fun foo(): Flow<Int> = flow {
226 println("Flow started")
227 for (i in 1..3) {
228 delay(100)
229 emit(i)
230 }
231}
232
233fun main() = runBlocking<Unit> {
234 println("Calling foo...")
235 val flow = foo()
236 println("Calling collect...")
237 flow.collect { value -> println(value) }
238 println("Calling collect again...")
239 flow.collect { value -> println(value) }
240}
241//sampleEnd
242```
243
244</div>
245
David.Watsonbb714c52019-08-30 17:49:42 +0200246> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-05.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300247
248Which prints:
249
250```text
251Calling foo...
252Calling collect...
253Flow started
2541
2552
2563
257Calling collect again...
258Flow started
2591
2602
2613
262```
263
264<!--- TEST -->
265
David.Watsonbb714c52019-08-30 17:49:42 +0200266This is a key reason the `foo()` function (which returns a flow) is not marked with `suspend` modifier.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300267By itself, `foo()` returns quickly and does not wait for anything. The flow starts every time it is collected,
David.Watsonbb714c52019-08-30 17:49:42 +0200268that is why we see "Flow started" when we call `collect` again.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300269
270### Flow cancellation
271
David.Watsonbb714c52019-08-30 17:49:42 +0200272Flow adheres to the general cooperative cancellation of coroutines. However, flow infrastructure does not introduce
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300273additional cancellation points. It is fully transparent for cancellation. As usual, flow collection can be
David.Watsonbb714c52019-08-30 17:49:42 +0200274cancelled when the flow is suspended in a cancellable suspending function (like [delay]), and cannot be cancelled otherwise.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300275
David.Watsonbb714c52019-08-30 17:49:42 +0200276The following example shows how the flow gets cancelled on a timeout when running in a [withTimeoutOrNull] block
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300277and stops executing its code:
278
279<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
280
281```kotlin
282import kotlinx.coroutines.*
283import kotlinx.coroutines.flow.*
284
285//sampleStart
286fun foo(): Flow<Int> = flow {
287 for (i in 1..3) {
288 delay(100)
289 println("Emitting $i")
290 emit(i)
291 }
292}
293
294fun main() = runBlocking<Unit> {
295 withTimeoutOrNull(250) { // Timeout after 250ms
296 foo().collect { value -> println(value) }
297 }
298 println("Done")
299}
300//sampleEnd
301```
302
303</div>
304
David.Watsonbb714c52019-08-30 17:49:42 +0200305> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-06.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300306
307Notice how only two numbers get emitted by the flow in `foo()` function, producing the following output:
308
309```text
310Emitting 1
3111
312Emitting 2
3132
314Done
315```
316
317<!--- TEST -->
318
319### Flow builders
320
321The `flow { ... }` builder from the previous examples is the most basic one. There are other builders for
David.Watsonbb714c52019-08-30 17:49:42 +0200322easier declaration of flows:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300323
324* [flowOf] builder that defines a flow emitting a fixed set of values.
325* Various collections and sequences can be converted to flows using `.asFlow()` extension functions.
326
David.Watsonbb714c52019-08-30 17:49:42 +0200327So, the example that prints the numbers from 1 to 3 from a flow can be written as:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300328
329<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
330
331```kotlin
332import kotlinx.coroutines.*
333import kotlinx.coroutines.flow.*
334
335fun main() = runBlocking<Unit> {
336//sampleStart
337 // Convert an integer range to a flow
338 (1..3).asFlow().collect { value -> println(value) }
339//sampleEnd
340}
341```
342
343</div>
344
David.Watsonbb714c52019-08-30 17:49:42 +0200345> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-07.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300346
347<!--- TEST
3481
3492
3503
351-->
352
353### Intermediate flow operators
354
David.Watsonbb714c52019-08-30 17:49:42 +0200355Flows can be transformed with operators, just as you would with collections and sequences.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300356Intermediate operators are applied to an upstream flow and return a downstream flow.
357These operators are cold, just like flows are. A call to such an operator is not
358a suspending function itself. It works quickly, returning the definition of a new transformed flow.
359
360The basic operators have familiar names like [map] and [filter].
David.Watsonbb714c52019-08-30 17:49:42 +0200361The important difference to sequences is that blocks of
362code inside these operators can call suspending functions.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300363
364For example, a flow of incoming requests can be
David.Watsonbb714c52019-08-30 17:49:42 +0200365mapped to the results with the [map] operator, even when performing a request is a long-running
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300366operation that is implemented by a suspending function:
367
368<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
369
370```kotlin
371import kotlinx.coroutines.*
372import kotlinx.coroutines.flow.*
373
374//sampleStart
375suspend fun performRequest(request: Int): String {
376 delay(1000) // imitate long-running asynchronous work
377 return "response $request"
378}
379
380fun main() = runBlocking<Unit> {
381 (1..3).asFlow() // a flow of requests
382 .map { request -> performRequest(request) }
383 .collect { response -> println(response) }
384}
385//sampleEnd
386```
387
388</div>
389
David.Watsonbb714c52019-08-30 17:49:42 +0200390> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-08.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300391
David.Watsonbb714c52019-08-30 17:49:42 +0200392It produces the following three lines, each line appearing after each second:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300393
394```text
395response 1
396response 2
397response 3
398```
399
400<!--- TEST -->
401
402#### Transform operator
403
404Among the flow transformation operators, the most general one is called [transform]. It can be used to imitate
David.Watsonbb714c52019-08-30 17:49:42 +0200405simple transformations like [map] and [filter], as well as implement more complex transformations.
406Using the `transform` operator, we can [emit][FlowCollector.emit] arbitrary values an arbitrary number of times.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300407
408For example, using `transform` we can emit a string before performing a long-running asynchronous request
409and follow it with a response:
410
411<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
412
413```kotlin
414import kotlinx.coroutines.*
415import kotlinx.coroutines.flow.*
416
417suspend fun performRequest(request: Int): String {
418 delay(1000) // imitate long-running asynchronous work
419 return "response $request"
420}
421
422fun main() = runBlocking<Unit> {
423//sampleStart
424 (1..3).asFlow() // a flow of requests
425 .transform { request ->
426 emit("Making request $request")
427 emit(performRequest(request))
428 }
429 .collect { response -> println(response) }
430//sampleEnd
431}
432```
433
434</div>
435
David.Watsonbb714c52019-08-30 17:49:42 +0200436> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-09.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300437
438The output of this code is:
439
440```text
441Making request 1
442response 1
443Making request 2
444response 2
445Making request 3
446response 3
447```
448
449<!--- TEST -->
450
451#### Size-limiting operators
452
453Size-limiting intermediate operators like [take] cancel the execution of the flow when the corresponding limit
David.Watsonbb714c52019-08-30 17:49:42 +0200454is reached. Cancellation in coroutines is always performed by throwing an exception, so that all the resource-management
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300455functions (like `try { ... } finally { ... }` blocks) operate normally in case of cancellation:
456
457<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
458
459```kotlin
460import kotlinx.coroutines.*
461import kotlinx.coroutines.flow.*
462
463//sampleStart
464fun numbers(): Flow<Int> = flow {
465 try {
466 emit(1)
467 emit(2)
468 println("This line will not execute")
469 emit(3)
470 } finally {
471 println("Finally in numbers")
472 }
473}
474
475fun main() = runBlocking<Unit> {
476 numbers()
477 .take(2) // take only the first two
478 .collect { value -> println(value) }
479}
480//sampleEnd
481```
482
483</div>
484
David.Watsonbb714c52019-08-30 17:49:42 +0200485> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-10.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300486
David.Watsonbb714c52019-08-30 17:49:42 +0200487The output of this code clearly shows that the execution of the `flow { ... }` body in the `numbers()` function
488stopped after emitting the second number:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300489
490```text
4911
4922
493Finally in numbers
494```
495
496<!--- TEST -->
497
498### Terminal flow operators
499
500Terminal operators on flows are _suspending functions_ that start a collection of the flow.
David.Watsonbb714c52019-08-30 17:49:42 +0200501The [collect] operator is the most basic one, but there are other terminal operators, which can make it easier:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300502
503* Conversion to various collections like [toList] and [toSet].
504* Operators to get the [first] value and to ensure that a flow emits a [single] value.
505* Reducing a flow to a value with [reduce] and [fold].
506
507For example:
508
509<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
510
511```kotlin
512import kotlinx.coroutines.*
513import kotlinx.coroutines.flow.*
514
515fun main() = runBlocking<Unit> {
516//sampleStart
517 val sum = (1..5).asFlow()
518 .map { it * it } // squares of numbers from 1 to 5
519 .reduce { a, b -> a + b } // sum them (terminal operator)
520 println(sum)
521//sampleEnd
522}
523```
524
525</div>
526
David.Watsonbb714c52019-08-30 17:49:42 +0200527> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-11.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300528
529Prints a single number:
530
531```text
53255
533```
534
535<!--- TEST -->
536
537### Flows are sequential
538
539Each individual collection of a flow is performed sequentially unless special operators that operate
540on multiple flows are used. The collection works directly in the coroutine that calls a terminal operator.
541No new coroutines are launched by default.
David.Watsonbb714c52019-08-30 17:49:42 +0200542Each emitted value is processed by all the intermediate operators from
543upstream to downstream and is then delivered to the terminal operator after.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300544
David.Watsonbb714c52019-08-30 17:49:42 +0200545See the following example that filters the even integers and maps them to strings:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300546
547<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
548
549```kotlin
550import kotlinx.coroutines.*
551import kotlinx.coroutines.flow.*
552
553fun main() = runBlocking<Unit> {
554//sampleStart
555 (1..5).asFlow()
556 .filter {
557 println("Filter $it")
558 it % 2 == 0
559 }
560 .map {
561 println("Map $it")
562 "string $it"
563 }.collect {
564 println("Collect $it")
565 }
566//sampleEnd
567}
568```
569
570</div>
571
David.Watsonbb714c52019-08-30 17:49:42 +0200572> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-12.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300573
574Producing:
575
576```text
577Filter 1
578Filter 2
579Map 2
580Collect string 2
581Filter 3
582Filter 4
583Map 4
584Collect string 4
585Filter 5
586```
587
588<!--- TEST -->
589
590### Flow context
591
592Collection of a flow always happens in the context of the calling coroutine. For example, if there is
593a `foo` flow, then the following code runs in the context specified
David.Watsonbb714c52019-08-30 17:49:42 +0200594by the author of this code, regardless of the implementation details of the `foo` flow:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300595
596<div class="sample" markdown="1" theme="idea" data-highlight-only>
597
598```kotlin
599withContext(context) {
600 foo.collect { value ->
601 println(value) // run in the specified context
602 }
603}
604```
605
606</div>
607
608<!--- CLEAR -->
609
610This property of a flow is called _context preservation_.
611
612So, by default, code in the `flow { ... }` builder runs in the context that is provided by a collector
613of the corresponding flow. For example, consider the implementation of `foo` that prints the thread
614it is called on and emits three numbers:
615
616<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
617
618```kotlin
619import kotlinx.coroutines.*
620import kotlinx.coroutines.flow.*
621
622fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
623
624//sampleStart
625fun foo(): Flow<Int> = flow {
626 log("Started foo flow")
627 for (i in 1..3) {
628 emit(i)
629 }
630}
631
632fun main() = runBlocking<Unit> {
633 foo().collect { value -> log("Collected $value") }
634}
635//sampleEnd
636```
637
638</div>
639
David.Watsonbb714c52019-08-30 17:49:42 +0200640> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-13.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300641
642Running this code produces:
643
644```text
645[main @coroutine#1] Started foo flow
646[main @coroutine#1] Collected 1
647[main @coroutine#1] Collected 2
648[main @coroutine#1] Collected 3
649```
650
651<!--- TEST FLEXIBLE_THREAD -->
652
653Since `foo().collect` is called from the main thread, the body of `foo`'s flow is also called in the main thread.
David.Watsonbb714c52019-08-30 17:49:42 +0200654This is the perfect default for fast-running or asynchronous code that does not care about the execution context and
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300655does not block the caller.
656
657#### Wrong emission withContext
658
659However, the long-running CPU-consuming code might need to be executed in the context of [Dispatchers.Default] and UI-updating
660code might need to be executed in the context of [Dispatchers.Main]. Usually, [withContext] is used
David.Watsonbb714c52019-08-30 17:49:42 +0200661to change the context in the code using Kotlin coroutines, but code in the `flow { ... }` builder has to honor the context
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300662preservation property and is not allowed to [emit][FlowCollector.emit] from a different context.
663
664Try running the following code:
665
666<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
667
668```kotlin
669import kotlinx.coroutines.*
670import kotlinx.coroutines.flow.*
671
672//sampleStart
673fun foo(): Flow<Int> = flow {
David.Watsonbb714c52019-08-30 17:49:42 +0200674 // The WRONG way to change context for CPU-consuming code in flow builder
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300675 kotlinx.coroutines.withContext(Dispatchers.Default) {
676 for (i in 1..3) {
677 Thread.sleep(100) // pretend we are computing it in CPU-consuming way
678 emit(i) // emit next value
679 }
680 }
681}
682
683fun main() = runBlocking<Unit> {
684 foo().collect { value -> println(value) }
685}
686//sampleEnd
687```
688
689</div>
690
David.Watsonbb714c52019-08-30 17:49:42 +0200691> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-14.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300692
693This code produces the following exception:
694
Vsevolod Tolstopyatov83943ef2019-10-22 19:26:50 +0300695```text
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300696Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
697 Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
698 but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, DefaultDispatcher].
699 Please refer to 'flow' documentation or use 'flowOn' instead
700 at ...
Vsevolod Tolstopyatov83943ef2019-10-22 19:26:50 +0300701```
702
703<!--- TEST EXCEPTION -->
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300704
705#### flowOn operator
706
David.Watsonbb714c52019-08-30 17:49:42 +0200707The exception refers to the [flowOn] function that shall be used to change the context of the flow emission.
708The correct way to change the context of a flow is shown in the example below, which also prints the
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300709names of the corresponding threads to show how it all works:
710
711<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
712
713```kotlin
714import kotlinx.coroutines.*
715import kotlinx.coroutines.flow.*
716
717fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
718
719//sampleStart
720fun foo(): Flow<Int> = flow {
721 for (i in 1..3) {
722 Thread.sleep(100) // pretend we are computing it in CPU-consuming way
723 log("Emitting $i")
724 emit(i) // emit next value
725 }
726}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
727
728fun main() = runBlocking<Unit> {
729 foo().collect { value ->
730 log("Collected $value")
731 }
732}
733//sampleEnd
734```
735
736</div>
737
David.Watsonbb714c52019-08-30 17:49:42 +0200738> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-15.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300739
740Notice how `flow { ... }` works in the background thread, while collection happens in the main thread:
741
742<!--- TEST FLEXIBLE_THREAD
743[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
744[main @coroutine#1] Collected 1
745[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
746[main @coroutine#1] Collected 2
747[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
748[main @coroutine#1] Collected 3
749-->
750
David.Watsonbb714c52019-08-30 17:49:42 +0200751Another thing to observe here is that the [flowOn] operator has changed the default sequential nature of the flow.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300752Now collection happens in one coroutine ("coroutine#1") and emission happens in another coroutine
David.Watsonbb714c52019-08-30 17:49:42 +0200753("coroutine#2") that is running in another thread concurrently with the collecting coroutine. The [flowOn] operator
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300754creates another coroutine for an upstream flow when it has to change the [CoroutineDispatcher] in its context.
755
756### Buffering
757
David.Watsonbb714c52019-08-30 17:49:42 +0200758Running different parts of a flow in different coroutines can be helpful from the standpoint of the overall time it takes
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300759to collect the flow, especially when long-running asynchronous operations are involved. For example, consider a case when
David.Watsonbb714c52019-08-30 17:49:42 +0200760the emission by `foo()` flow is slow, taking 100 ms to produce an element; and collector is also slow,
761taking 300 ms to process an element. Let's see how long it takes to collect such a flow with three numbers:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300762
763<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
764
765```kotlin
766import kotlinx.coroutines.*
767import kotlinx.coroutines.flow.*
768import kotlin.system.*
769
770//sampleStart
771fun foo(): Flow<Int> = flow {
772 for (i in 1..3) {
773 delay(100) // pretend we are asynchronously waiting 100 ms
774 emit(i) // emit next value
775 }
776}
777
778fun main() = runBlocking<Unit> {
779 val time = measureTimeMillis {
780 foo().collect { value ->
781 delay(300) // pretend we are processing it for 300 ms
782 println(value)
783 }
784 }
785 println("Collected in $time ms")
786}
787//sampleEnd
788```
789
790</div>
791
David.Watsonbb714c52019-08-30 17:49:42 +0200792> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-16.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300793
David.Watsonbb714c52019-08-30 17:49:42 +0200794It produces something like this, with the whole collection taking around 1200 ms (three numbers, 400 ms for each):
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300795
796```text
7971
7982
7993
800Collected in 1220 ms
801```
802
803<!--- TEST ARBITRARY_TIME -->
804
David.Watsonbb714c52019-08-30 17:49:42 +0200805We can use a [buffer] operator on a flow to run emitting code of `foo()` concurrently with collecting code,
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300806as opposed to running them sequentially:
807
808<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
809
810```kotlin
811import kotlinx.coroutines.*
812import kotlinx.coroutines.flow.*
813import kotlin.system.*
814
815fun foo(): Flow<Int> = flow {
816 for (i in 1..3) {
817 delay(100) // pretend we are asynchronously waiting 100 ms
818 emit(i) // emit next value
819 }
820}
821
822fun main() = runBlocking<Unit> {
823//sampleStart
824 val time = measureTimeMillis {
825 foo()
826 .buffer() // buffer emissions, don't wait
827 .collect { value ->
828 delay(300) // pretend we are processing it for 300 ms
829 println(value)
830 }
831 }
832 println("Collected in $time ms")
833//sampleEnd
834}
835```
836
837</div>
838
David.Watsonbb714c52019-08-30 17:49:42 +0200839> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-17.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300840
David.Watsonbb714c52019-08-30 17:49:42 +0200841It produces the same numbers just faster, as we have effectively created a processing pipeline,
842having to only wait 100 ms for the first number and then spending only 300 ms to process
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300843each number. This way it takes around 1000 ms to run:
844
845```text
8461
8472
8483
849Collected in 1071 ms
850```
851
852<!--- TEST ARBITRARY_TIME -->
853
David.Watsonbb714c52019-08-30 17:49:42 +0200854> Note that the [flowOn] operator uses the same buffering mechanism when it has to change a [CoroutineDispatcher],
855but here we explicitly request buffering without changing the execution context.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300856
857#### Conflation
858
David.Watsonbb714c52019-08-30 17:49:42 +0200859When a flow represents partial results of the operation or operation status updates, it may not be necessary
860to process each value, but instead, only most recent ones. In this case, the [conflate] operator can be used to skip
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300861intermediate values when a collector is too slow to process them. Building on the previous example:
862
863<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
864
865```kotlin
866import kotlinx.coroutines.*
867import kotlinx.coroutines.flow.*
868import kotlin.system.*
869
870fun foo(): Flow<Int> = flow {
871 for (i in 1..3) {
872 delay(100) // pretend we are asynchronously waiting 100 ms
873 emit(i) // emit next value
874 }
875}
876
877fun main() = runBlocking<Unit> {
878//sampleStart
879 val time = measureTimeMillis {
880 foo()
881 .conflate() // conflate emissions, don't process each one
882 .collect { value ->
883 delay(300) // pretend we are processing it for 300 ms
884 println(value)
885 }
886 }
887 println("Collected in $time ms")
888//sampleEnd
889}
890```
891
892</div>
893
David.Watsonbb714c52019-08-30 17:49:42 +0200894> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-18.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300895
David.Watsonbb714c52019-08-30 17:49:42 +0200896We see that while the first number was still being processed the second, and third were already produced, so
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300897the second one was _conflated_ and only the most recent (the third one) was delivered to the collector:
898
899```text
9001
9013
902Collected in 758 ms
903```
904
905<!--- TEST ARBITRARY_TIME -->
906
907#### Processing the latest value
908
David.Watsonbb714c52019-08-30 17:49:42 +0200909Conflation is one way to speed up processing when both the emitter and collector are slow. It does it by dropping emitted values.
910The other way is to cancel a slow collector and restart it every time a new value is emitted. There is
911a family of `xxxLatest` operators that perform the same essential logic of a `xxx` operator, but cancel the
912code in their block on a new value. Let's try changing [conflate] to [collectLatest] in the previous example:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300913
914<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
915
916```kotlin
917import kotlinx.coroutines.*
918import kotlinx.coroutines.flow.*
919import kotlin.system.*
920
921fun foo(): Flow<Int> = flow {
922 for (i in 1..3) {
923 delay(100) // pretend we are asynchronously waiting 100 ms
924 emit(i) // emit next value
925 }
926}
927
928fun main() = runBlocking<Unit> {
929//sampleStart
930 val time = measureTimeMillis {
931 foo()
932 .collectLatest { value -> // cancel & restart on the latest value
933 println("Collecting $value")
934 delay(300) // pretend we are processing it for 300 ms
935 println("Done $value")
936 }
937 }
938 println("Collected in $time ms")
939//sampleEnd
940}
941```
942
943</div>
944
David.Watsonbb714c52019-08-30 17:49:42 +0200945> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-19.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300946
947Since the body of [collectLatest] takes 300 ms, but new values are emitted every 100 ms, we see that the block
948is run on every value, but completes only for the last value:
949
950```text
951Collecting 1
952Collecting 2
953Collecting 3
954Done 3
955Collected in 741 ms
956```
957
958<!--- TEST ARBITRARY_TIME -->
959
960### Composing multiple flows
961
David.Watsonbb714c52019-08-30 17:49:42 +0200962There are lots of ways to compose multiple flows.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300963
964#### Zip
965
David.Watsonbb714c52019-08-30 17:49:42 +0200966Just like the [Sequence.zip] extension function in the Kotlin standard library,
967flows have a [zip] operator that combines the corresponding values of two flows:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300968
969<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
970
971```kotlin
972import kotlinx.coroutines.*
973import kotlinx.coroutines.flow.*
974
975fun main() = runBlocking<Unit> {
976//sampleStart
977 val nums = (1..3).asFlow() // numbers 1..3
978 val strs = flowOf("one", "two", "three") // strings
979 nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
980 .collect { println(it) } // collect and print
981//sampleEnd
982}
983```
984
985</div>
986
David.Watsonbb714c52019-08-30 17:49:42 +0200987> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-20.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300988
989This example prints:
990
991```text
9921 -> one
9932 -> two
9943 -> three
995```
996
997<!--- TEST -->
998
999#### Combine
1000
David.Watsonbb714c52019-08-30 17:49:42 +02001001When flow represents the most recent value of a variable or operation (see also the related
1002section on [conflation](#conflation)), it might be needed to perform a computation that depends on
1003the most recent values of the corresponding flows and to recompute it whenever any of the upstream
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001004flows emit a value. The corresponding family of operators is called [combine].
1005
1006For example, if the numbers in the previous example update every 300ms, but strings update every 400 ms,
David.Watsonbb714c52019-08-30 17:49:42 +02001007then zipping them using the [zip] operator will still produce the same result,
1008albeit results that are printed every 400 ms:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001009
David.Watsonbb714c52019-08-30 17:49:42 +02001010> We use a [onEach] intermediate operator in this example to delay each element and make the code
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001011that emits sample flows more declarative and shorter.
1012
1013<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1014
1015```kotlin
1016import kotlinx.coroutines.*
1017import kotlinx.coroutines.flow.*
1018
1019fun main() = runBlocking<Unit> {
1020//sampleStart
1021 val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
1022 val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
1023 val startTime = System.currentTimeMillis() // remember the start time
1024 nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
1025 .collect { value -> // collect and print
1026 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1027 }
1028//sampleEnd
1029}
1030```
1031
1032</div>
1033
David.Watsonbb714c52019-08-30 17:49:42 +02001034> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-21.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001035
1036<!--- TEST ARBITRARY_TIME
10371 -> one at 437 ms from start
10382 -> two at 837 ms from start
10393 -> three at 1243 ms from start
1040-->
1041
David.Watsonbb714c52019-08-30 17:49:42 +02001042However, when using a [combine] operator here instead of a [zip]:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001043
1044<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1045
1046```kotlin
1047import kotlinx.coroutines.*
1048import kotlinx.coroutines.flow.*
1049
1050fun main() = runBlocking<Unit> {
1051//sampleStart
1052 val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
1053 val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
Roman Elizarova73862f2019-09-02 17:31:14 +03001054 val startTime = System.currentTimeMillis() // remember the start time
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001055 nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
1056 .collect { value -> // collect and print
1057 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1058 }
1059//sampleEnd
1060}
1061```
1062
1063</div>
1064
David.Watsonbb714c52019-08-30 17:49:42 +02001065> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-22.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001066
1067We get quite a different output, where a line is printed at each emission from either `nums` or `strs` flows:
1068
1069```text
10701 -> one at 452 ms from start
10712 -> one at 651 ms from start
10722 -> two at 854 ms from start
10733 -> two at 952 ms from start
10743 -> three at 1256 ms from start
1075```
1076
1077<!--- TEST ARBITRARY_TIME -->
1078
1079### Flattening flows
1080
1081Flows represent asynchronously received sequences of values, so it is quite easy to get in a situation where
1082each value triggers a request for another sequence of values. For example, we can have the following
1083function that returns a flow of two strings 500 ms apart:
1084
1085<div class="sample" markdown="1" theme="idea" data-highlight-only>
1086
1087```kotlin
1088fun requestFlow(i: Int): Flow<String> = flow {
1089 emit("$i: First")
1090 delay(500) // wait 500 ms
1091 emit("$i: Second")
1092}
1093```
1094
1095</div>
1096
1097<!--- CLEAR -->
1098
1099Now if we have a flow of three integers and call `requestFlow` for each of them like this:
1100
1101<div class="sample" markdown="1" theme="idea" data-highlight-only>
1102
1103```kotlin
1104(1..3).asFlow().map { requestFlow(it) }
1105```
1106
1107</div>
1108
1109<!--- CLEAR -->
1110
1111Then we end up with a flow of flows (`Flow<Flow<String>>`) that needs to be _flattened_ into a single flow for
1112further processing. Collections and sequences have [flatten][Sequence.flatten] and [flatMap][Sequence.flatMap]
David.Watsonbb714c52019-08-30 17:49:42 +02001113operators for this. However, due the asynchronous nature of flows they call for different _modes_ of flattening,
1114as such, there is a family of flattening operators on flows.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001115
1116#### flatMapConcat
1117
1118Concatenating mode is implemented by [flatMapConcat] and [flattenConcat] operators. They are the most direct
David.Watsonbb714c52019-08-30 17:49:42 +02001119analogues of the corresponding sequence operators. They wait for the inner flow to complete before
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001120starting to collect the next one as the following example shows:
1121
1122<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1123
1124```kotlin
1125import kotlinx.coroutines.*
1126import kotlinx.coroutines.flow.*
1127
1128fun requestFlow(i: Int): Flow<String> = flow {
1129 emit("$i: First")
1130 delay(500) // wait 500 ms
1131 emit("$i: Second")
1132}
1133
1134fun main() = runBlocking<Unit> {
1135//sampleStart
Roman Elizarova73862f2019-09-02 17:31:14 +03001136 val startTime = System.currentTimeMillis() // remember the start time
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001137 (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
1138 .flatMapConcat { requestFlow(it) }
1139 .collect { value -> // collect and print
1140 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1141 }
1142//sampleEnd
1143}
1144```
1145
1146</div>
1147
David.Watsonbb714c52019-08-30 17:49:42 +02001148> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-23.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001149
1150The sequential nature of [flatMapConcat] is clearly seen in the output:
1151
1152```text
11531: First at 121 ms from start
11541: Second at 622 ms from start
11552: First at 727 ms from start
11562: Second at 1227 ms from start
11573: First at 1328 ms from start
11583: Second at 1829 ms from start
1159```
1160
1161<!--- TEST ARBITRARY_TIME -->
1162
1163#### flatMapMerge
1164
1165Another flattening mode is to concurrently collect all the incoming flows and merge their values into
1166a single flow so that values are emitted as soon as possible.
1167It is implemented by [flatMapMerge] and [flattenMerge] operators. They both accept an optional
1168`concurrency` parameter that limits the number of concurrent flows that are collected at the same time
1169(it is equal to [DEFAULT_CONCURRENCY] by default).
1170
1171<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1172
1173```kotlin
1174import kotlinx.coroutines.*
1175import kotlinx.coroutines.flow.*
1176
1177fun requestFlow(i: Int): Flow<String> = flow {
1178 emit("$i: First")
1179 delay(500) // wait 500 ms
1180 emit("$i: Second")
1181}
1182
1183fun main() = runBlocking<Unit> {
1184//sampleStart
Roman Elizarova73862f2019-09-02 17:31:14 +03001185 val startTime = System.currentTimeMillis() // remember the start time
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001186 (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
1187 .flatMapMerge { requestFlow(it) }
1188 .collect { value -> // collect and print
1189 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1190 }
1191//sampleEnd
1192}
1193```
1194
1195</div>
1196
David.Watsonbb714c52019-08-30 17:49:42 +02001197> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-24.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001198
1199The concurrent nature of [flatMapMerge] is obvious:
1200
1201```text
12021: First at 136 ms from start
12032: First at 231 ms from start
12043: First at 333 ms from start
12051: Second at 639 ms from start
12062: Second at 732 ms from start
12073: Second at 833 ms from start
1208```
1209
1210<!--- TEST ARBITRARY_TIME -->
1211
David.Watsonbb714c52019-08-30 17:49:42 +02001212> Note that the [flatMapMerge] calls its block of code (`{ requestFlow(it) }` in this example) sequentially, but
1213collects the resulting flows concurrently, it is the equivalent of performing a sequential
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001214`map { requestFlow(it) }` first and then calling [flattenMerge] on the result.
1215
1216#### flatMapLatest
1217
David.Watsonbb714c52019-08-30 17:49:42 +02001218In a similar way to the [collectLatest] operator, that was shown in
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001219["Processing the latest value"](#processing-the-latest-value) section, there is the corresponding "Latest"
David.Watsonbb714c52019-08-30 17:49:42 +02001220flattening mode where a collection of the previous flow is cancelled as soon as new flow is emitted.
1221It is implemented by the [flatMapLatest] operator.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001222
1223<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1224
1225```kotlin
1226import kotlinx.coroutines.*
1227import kotlinx.coroutines.flow.*
1228
1229fun requestFlow(i: Int): Flow<String> = flow {
1230 emit("$i: First")
1231 delay(500) // wait 500 ms
1232 emit("$i: Second")
1233}
1234
1235fun main() = runBlocking<Unit> {
1236//sampleStart
Roman Elizarova73862f2019-09-02 17:31:14 +03001237 val startTime = System.currentTimeMillis() // remember the start time
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001238 (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
1239 .flatMapLatest { requestFlow(it) }
1240 .collect { value -> // collect and print
1241 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1242 }
1243//sampleEnd
1244}
1245```
1246
1247</div>
1248
David.Watsonbb714c52019-08-30 17:49:42 +02001249> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-25.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001250
David.Watsonbb714c52019-08-30 17:49:42 +02001251The output here in this example is a good demonstration of how [flatMapLatest] works:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001252
1253```text
12541: First at 142 ms from start
12552: First at 322 ms from start
12563: First at 425 ms from start
12573: Second at 931 ms from start
1258```
1259
1260<!--- TEST ARBITRARY_TIME -->
1261
1262> Note that [flatMapLatest] cancels all the code in its block (`{ requestFlow(it) }` in this example) on a new value.
1263It makes no difference in this particular example, because the call to `requestFlow` itself is fast, not-suspending,
1264and cannot be cancelled. However, it would show up if we were to use suspending functions like `delay` in there.
1265
1266### Flow exceptions
1267
David.Watsonbb714c52019-08-30 17:49:42 +02001268Flow collection can complete with an exception when an emitter or code inside the operators throw an exception.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001269There are several ways to handle these exceptions.
1270
1271#### Collector try and catch
1272
1273A collector can use Kotlin's [`try/catch`][exceptions] block to handle exceptions:
1274
1275<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1276
1277```kotlin
1278import kotlinx.coroutines.*
1279import kotlinx.coroutines.flow.*
1280
1281//sampleStart
1282fun foo(): Flow<Int> = flow {
1283 for (i in 1..3) {
1284 println("Emitting $i")
1285 emit(i) // emit next value
1286 }
1287}
1288
1289fun main() = runBlocking<Unit> {
1290 try {
1291 foo().collect { value ->
1292 println(value)
1293 check(value <= 1) { "Collected $value" }
1294 }
1295 } catch (e: Throwable) {
1296 println("Caught $e")
1297 }
1298}
1299//sampleEnd
1300```
1301
1302</div>
1303
David.Watsonbb714c52019-08-30 17:49:42 +02001304> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-26.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001305
1306This code successfully catches an exception in [collect] terminal operator and,
David.Watsonbb714c52019-08-30 17:49:42 +02001307as we see, no more values are emitted after that:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001308
1309```text
1310Emitting 1
13111
1312Emitting 2
13132
1314Caught java.lang.IllegalStateException: Collected 2
1315```
1316
1317<!--- TEST -->
1318
1319#### Everything is caught
1320
David.Watsonbb714c52019-08-30 17:49:42 +02001321The previous example actually catches any exception happening in the emitter or in any intermediate or terminal operators.
1322For example, let's change the code so that emitted values are [mapped][map] to strings,
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001323but the corresponding code produces an exception:
1324
1325<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1326
1327```kotlin
1328import kotlinx.coroutines.*
1329import kotlinx.coroutines.flow.*
1330
1331//sampleStart
1332fun foo(): Flow<String> =
1333 flow {
1334 for (i in 1..3) {
1335 println("Emitting $i")
1336 emit(i) // emit next value
1337 }
1338 }
1339 .map { value ->
1340 check(value <= 1) { "Crashed on $value" }
1341 "string $value"
1342 }
1343
1344fun main() = runBlocking<Unit> {
1345 try {
1346 foo().collect { value -> println(value) }
1347 } catch (e: Throwable) {
1348 println("Caught $e")
1349 }
1350}
1351//sampleEnd
1352```
1353
1354</div>
1355
David.Watsonbb714c52019-08-30 17:49:42 +02001356> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-27.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001357
1358This exception is still caught and collection is stopped:
1359
1360```text
1361Emitting 1
1362string 1
1363Emitting 2
1364Caught java.lang.IllegalStateException: Crashed on 2
1365```
1366
1367<!--- TEST -->
1368
1369### Exception transparency
1370
David.Watsonbb714c52019-08-30 17:49:42 +02001371But how can code of the emitter encapsulate its exception handling behavior?
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001372
David.Watsonbb714c52019-08-30 17:49:42 +02001373Flows must be _transparent to exceptions_ and it is a violation of the exception transparency to [emit][FlowCollector.emit] values in the
1374`flow { ... }` builder from inside of a `try/catch` block. This guarantees that a collector throwing an exception
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001375can always catch it using `try/catch` as in the previous example.
1376
David.Watsonbb714c52019-08-30 17:49:42 +02001377The emitter can use a [catch] operator that preserves this exception transparency and allows encapsulation
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001378of its exception handling. The body of the `catch` operator can analyze an exception
1379and react to it in different ways depending on which exception was caught:
1380
1381* Exceptions can be rethrown using `throw`.
1382* Exceptions can be turned into emission of values using [emit][FlowCollector.emit] from the body of [catch].
1383* Exceptions can be ignored, logged, or processed by some other code.
1384
David.Watsonbb714c52019-08-30 17:49:42 +02001385For example, let us emit the text on catching an exception:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001386
1387<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1388
1389```kotlin
1390import kotlinx.coroutines.*
1391import kotlinx.coroutines.flow.*
1392
1393fun foo(): Flow<String> =
1394 flow {
1395 for (i in 1..3) {
1396 println("Emitting $i")
1397 emit(i) // emit next value
1398 }
1399 }
1400 .map { value ->
1401 check(value <= 1) { "Crashed on $value" }
1402 "string $value"
1403 }
1404
1405fun main() = runBlocking<Unit> {
1406//sampleStart
1407 foo()
1408 .catch { e -> emit("Caught $e") } // emit on exception
1409 .collect { value -> println(value) }
1410//sampleEnd
1411}
1412```
1413
1414</div>
1415
David.Watsonbb714c52019-08-30 17:49:42 +02001416> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-28.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001417
1418The output of the example is the same, even though we do not have `try/catch` around the code anymore.
1419
1420<!--- TEST
1421Emitting 1
1422string 1
1423Emitting 2
1424Caught java.lang.IllegalStateException: Crashed on 2
1425-->
1426
1427#### Transparent catch
1428
1429The [catch] intermediate operator, honoring exception transparency, catches only upstream exceptions
1430(that is an exception from all the operators above `catch`, but not below it).
1431If the block in `collect { ... }` (placed below `catch`) throws an exception then it escapes:
1432
1433<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1434
1435```kotlin
1436import kotlinx.coroutines.*
1437import kotlinx.coroutines.flow.*
1438
1439//sampleStart
1440fun foo(): Flow<Int> = flow {
1441 for (i in 1..3) {
1442 println("Emitting $i")
1443 emit(i)
1444 }
1445}
1446
1447fun main() = runBlocking<Unit> {
1448 foo()
1449 .catch { e -> println("Caught $e") } // does not catch downstream exceptions
1450 .collect { value ->
1451 check(value <= 1) { "Collected $value" }
1452 println(value)
1453 }
1454}
1455//sampleEnd
1456```
1457
1458</div>
1459
David.Watsonbb714c52019-08-30 17:49:42 +02001460> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-29.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001461
David.Watsonbb714c52019-08-30 17:49:42 +02001462A "Caught ..." message is not printed despite there being a `catch` operator:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001463
1464<!--- TEST EXCEPTION
1465Emitting 1
14661
1467Emitting 2
1468Exception in thread "main" java.lang.IllegalStateException: Collected 2
1469 at ...
1470-->
1471
1472#### Catching declaratively
1473
David.Watsonbb714c52019-08-30 17:49:42 +02001474We can combine the declarative nature of the [catch] operator with a desire to handle all the exceptions, by moving the body
1475of the [collect] operator into [onEach] and putting it before the `catch` operator. Collection of this flow must
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001476be triggered by a call to `collect()` without parameters:
1477
1478<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1479
1480```kotlin
1481import kotlinx.coroutines.*
1482import kotlinx.coroutines.flow.*
1483
1484fun foo(): Flow<Int> = flow {
1485 for (i in 1..3) {
1486 println("Emitting $i")
1487 emit(i)
1488 }
1489}
1490
1491fun main() = runBlocking<Unit> {
1492//sampleStart
1493 foo()
1494 .onEach { value ->
1495 check(value <= 1) { "Collected $value" }
1496 println(value)
1497 }
1498 .catch { e -> println("Caught $e") }
1499 .collect()
1500//sampleEnd
1501}
1502```
1503
1504</div>
1505
David.Watsonbb714c52019-08-30 17:49:42 +02001506> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-30.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001507
David.Watsonbb714c52019-08-30 17:49:42 +02001508Now we can see that a "Caught ..." message is printed and so we can catch all the exceptions without explicitly
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001509using a `try/catch` block:
1510
1511<!--- TEST EXCEPTION
1512Emitting 1
15131
1514Emitting 2
1515Caught java.lang.IllegalStateException: Collected 2
1516-->
1517
1518### Flow completion
1519
David.Watsonbb714c52019-08-30 17:49:42 +02001520When flow collection completes (normally or exceptionally) it may need to execute an action.
1521As you may have already noticed, it can be done in two ways: imperative or declarative.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001522
1523#### Imperative finally block
1524
David.Watsonbb714c52019-08-30 17:49:42 +02001525In addition to `try`/`catch`, a collector can also use a `finally` block to execute an action
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001526upon `collect` completion.
1527
1528<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1529
1530```kotlin
1531import kotlinx.coroutines.*
1532import kotlinx.coroutines.flow.*
1533
1534//sampleStart
1535fun foo(): Flow<Int> = (1..3).asFlow()
1536
1537fun main() = runBlocking<Unit> {
1538 try {
1539 foo().collect { value -> println(value) }
1540 } finally {
1541 println("Done")
1542 }
1543}
1544//sampleEnd
1545```
1546
1547</div>
1548
David.Watsonbb714c52019-08-30 17:49:42 +02001549> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-31.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001550
David.Watsonbb714c52019-08-30 17:49:42 +02001551This code prints three numbers produced by the `foo()` flow followed by a "Done" string:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001552
1553```text
15541
15552
15563
1557Done
1558```
1559
1560<!--- TEST -->
1561
1562#### Declarative handling
1563
David.Watsonbb714c52019-08-30 17:49:42 +02001564For the declarative approach, flow has [onCompletion] intermediate operator that is invoked
1565when the flow has completely collected.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001566
David.Watsonbb714c52019-08-30 17:49:42 +02001567The previous example can be rewritten using an [onCompletion] operator and produces the same output:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001568
1569<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1570
1571```kotlin
1572import kotlinx.coroutines.*
1573import kotlinx.coroutines.flow.*
1574
1575fun foo(): Flow<Int> = (1..3).asFlow()
1576
1577fun main() = runBlocking<Unit> {
1578//sampleStart
1579 foo()
1580 .onCompletion { println("Done") }
1581 .collect { value -> println(value) }
1582//sampleEnd
1583}
1584```
1585</div>
1586
David.Watsonbb714c52019-08-30 17:49:42 +02001587> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-32.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001588
1589<!--- TEST
15901
15912
15923
1593Done
1594-->
1595
1596The key advantage of [onCompletion] is a nullable `Throwable` parameter of the lambda that can be used
David.Watsonbb714c52019-08-30 17:49:42 +02001597to determine whether the flow collection was completed normally or exceptionally. In the following
1598example the `foo()` flow throws an exception after emitting the number 1:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001599
1600<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1601
1602```kotlin
1603import kotlinx.coroutines.*
1604import kotlinx.coroutines.flow.*
1605
1606//sampleStart
1607fun foo(): Flow<Int> = flow {
1608 emit(1)
1609 throw RuntimeException()
1610}
1611
1612fun main() = runBlocking<Unit> {
1613 foo()
1614 .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
1615 .catch { cause -> println("Caught exception") }
1616 .collect { value -> println(value) }
1617}
1618//sampleEnd
1619```
1620</div>
1621
David.Watsonbb714c52019-08-30 17:49:42 +02001622> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-33.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001623
1624As you may expect, it prints:
1625
1626```text
16271
1628Flow completed exceptionally
1629Caught exception
1630```
1631
1632<!--- TEST -->
1633
David.Watsonbb714c52019-08-30 17:49:42 +02001634The [onCompletion] operator, unlike [catch], does not handle the exception. As we can see from the above
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001635example code, the exception still flows downstream. It will be delivered to further `onCompletion` operators
David.Watsonbb714c52019-08-30 17:49:42 +02001636and can be handled with a `catch` operator.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001637
1638#### Upstream exceptions only
1639
David.Watsonbb714c52019-08-30 17:49:42 +02001640Just like the [catch] operator, [onCompletion] only sees exceptions coming from upstream and does not
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001641see downstream exceptions. For example, run the following code:
1642
1643<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1644
1645```kotlin
1646import kotlinx.coroutines.*
1647import kotlinx.coroutines.flow.*
1648
1649//sampleStart
1650fun foo(): Flow<Int> = (1..3).asFlow()
1651
1652fun main() = runBlocking<Unit> {
1653 foo()
1654 .onCompletion { cause -> println("Flow completed with $cause") }
1655 .collect { value ->
1656 check(value <= 1) { "Collected $value" }
1657 println(value)
1658 }
1659}
1660//sampleEnd
1661```
1662
1663</div>
1664
David.Watsonbb714c52019-08-30 17:49:42 +02001665> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-34.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001666
David.Watsonbb714c52019-08-30 17:49:42 +02001667We can see the completion cause is null, yet collection failed with exception:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001668
1669```text
16701
1671Flow completed with null
1672Exception in thread "main" java.lang.IllegalStateException: Collected 2
1673```
1674
1675<!--- TEST EXCEPTION -->
1676
1677### Imperative versus declarative
1678
David.Watsonbb714c52019-08-30 17:49:42 +02001679Now we know how to collect flow, and handle its completion and exceptions in both imperative and declarative ways.
1680The natural question here is, which approach is preferred and why?
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001681As a library, we do not advocate for any particular approach and believe that both options
1682are valid and should be selected according to your own preferences and code style.
1683
1684### Launching flow
1685
David.Watsonbb714c52019-08-30 17:49:42 +02001686It is easy to use flows to represent asynchronous events that are coming from some source.
1687In this case, we need an analogue of the `addEventListener` function that registers a piece of code with a reaction
1688for incoming events and continues further work. The [onEach] operator can serve this role.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001689However, `onEach` is an intermediate operator. We also need a terminal operator to collect the flow.
1690Otherwise, just calling `onEach` has no effect.
1691
David.Watsonbb714c52019-08-30 17:49:42 +02001692If we use the [collect] terminal operator after `onEach`, then the code after it will wait until the flow is collected:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001693
1694<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1695
1696```kotlin
1697import kotlinx.coroutines.*
1698import kotlinx.coroutines.flow.*
1699
1700//sampleStart
1701// Imitate a flow of events
1702fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
1703
1704fun main() = runBlocking<Unit> {
1705 events()
1706 .onEach { event -> println("Event: $event") }
1707 .collect() // <--- Collecting the flow waits
1708 println("Done")
1709}
1710//sampleEnd
1711```
1712
1713</div>
1714
David.Watsonbb714c52019-08-30 17:49:42 +02001715> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-35.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001716
1717As you can see, it prints:
1718
1719```text
1720Event: 1
1721Event: 2
1722Event: 3
1723Done
1724```
1725
1726<!--- TEST -->
1727
David.Watsonbb714c52019-08-30 17:49:42 +02001728The [launchIn] terminal operator comes in handy here. By replacing `collect` with `launchIn` we can
1729launch a collection of the flow in a separate coroutine, so that execution of further code
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001730immediately continues:
1731
1732<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1733
1734```kotlin
1735import kotlinx.coroutines.*
1736import kotlinx.coroutines.flow.*
1737
1738// Imitate a flow of events
1739fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
1740
1741//sampleStart
1742fun main() = runBlocking<Unit> {
1743 events()
1744 .onEach { event -> println("Event: $event") }
1745 .launchIn(this) // <--- Launching the flow in a separate coroutine
1746 println("Done")
1747}
1748//sampleEnd
1749```
1750
1751</div>
1752
David.Watsonbb714c52019-08-30 17:49:42 +02001753> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-36.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001754
1755It prints:
1756
1757```text
1758Done
1759Event: 1
1760Event: 2
1761Event: 3
1762```
1763
1764<!--- TEST -->
1765
1766The required parameter to `launchIn` must specify a [CoroutineScope] in which the coroutine to collect the flow is
David.Watsonbb714c52019-08-30 17:49:42 +02001767launched. In the above example this scope comes from the [runBlocking]
1768coroutine builder, so while the flow is running, this [runBlocking] scope waits for completion of its child coroutine
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001769and keeps the main function from returning and terminating this example.
1770
David.Watsonbb714c52019-08-30 17:49:42 +02001771In actual applications a scope will come from an entity with a limited
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001772lifetime. As soon as the lifetime of this entity is terminated the corresponding scope is cancelled, cancelling
David.Watsonbb714c52019-08-30 17:49:42 +02001773the collection of the corresponding flow. This way the pair of `onEach { ... }.launchIn(scope)` works
1774like the `addEventListener`. However, there is no need for the corresponding `removeEventListener` function,
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001775as cancellation and structured concurrency serve this purpose.
1776
David.Watsonbb714c52019-08-30 17:49:42 +02001777Note that [launchIn] also returns a [Job], which can be used to [cancel][Job.cancel] the corresponding flow collection
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001778coroutine only without cancelling the whole scope or to [join][Job.join] it.
1779
Vsevolod Tolstopyatovc99704a2019-09-24 19:30:49 +03001780### Flow and Reactive Streams
1781
1782For those who are familiar with [Reactive Streams](https://www.reactive-streams.org/) or reactive frameworks such as RxJava and project Reactor,
1783design of the Flow may look very familiar.
1784
1785Indeed, its design was inspired by Reactive Streams and its various implementations. But Flow main goal is to have as simple design as possible,
1786be Kotlin and suspension friendly and respect structured concurrency. Achieving this goal would be impossible without reactive pioneers and their tremendous work. You can read the complete story in [Reactive Streams and Kotlin Flows](https://medium.com/@elizarov/reactive-streams-and-kotlin-flows-bfd12772cda4) article.
1787
1788While being different, conceptually, Flow *is* a reactive stream and it is possible to convert it to the reactive (spec and TCK compliant) Publisher and vice versa.
Vsevolod Tolstopyatov3250e472020-04-24 19:32:55 +03001789Such converters are provided by `kotlinx.coroutines` out-of-the-box and can be found in corresponding reactive modules (`kotlinx-coroutines-reactive` for Reactive Streams, `kotlinx-coroutines-reactor` for Project Reactor and `kotlinx-coroutines-rx2`/`kotlinx-coroutines-rx3` for RxJava2/RxJava3).
Vsevolod Tolstopyatovc99704a2019-09-24 19:30:49 +03001790Integration modules include conversions from and to `Flow`, integration with Reactor's `Context` and suspension-friendly ways to work with various reactive entities.
1791
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001792<!-- stdlib references -->
1793
1794[collections]: https://kotlinlang.org/docs/reference/collections-overview.html
1795[List]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/-list/index.html
1796[forEach]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/for-each.html
1797[Sequence]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/index.html
1798[Sequence.zip]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/zip.html
1799[Sequence.flatten]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/flatten.html
1800[Sequence.flatMap]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/flat-map.html
1801[exceptions]: https://kotlinlang.org/docs/reference/exceptions.html
1802
1803<!--- MODULE kotlinx-coroutines-core -->
1804<!--- INDEX kotlinx.coroutines -->
1805[delay]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/delay.html
1806[withTimeoutOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-timeout-or-null.html
1807[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
1808[Dispatchers.Main]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-main.html
1809[withContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-context.html
1810[CoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-dispatcher/index.html
1811[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html
1812[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
1813[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/index.html
1814[Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/cancel.html
1815[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html
1816<!--- INDEX kotlinx.coroutines.flow -->
1817[Flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html
1818[flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow.html
1819[FlowCollector.emit]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow-collector/emit.html
1820[collect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect.html
1821[flowOf]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-of.html
1822[map]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/map.html
1823[filter]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/filter.html
1824[transform]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/transform.html
1825[take]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/take.html
1826[toList]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-list.html
1827[toSet]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-set.html
1828[first]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/first.html
1829[single]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/single.html
1830[reduce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/reduce.html
1831[fold]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/fold.html
1832[flowOn]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-on.html
1833[buffer]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/buffer.html
1834[conflate]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/conflate.html
1835[collectLatest]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect-latest.html
1836[zip]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/zip.html
1837[combine]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/combine.html
1838[onEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-each.html
1839[flatMapConcat]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-concat.html
1840[flattenConcat]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-concat.html
1841[flatMapMerge]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-merge.html
1842[flattenMerge]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-merge.html
1843[DEFAULT_CONCURRENCY]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-d-e-f-a-u-l-t_-c-o-n-c-u-r-r-e-n-c-y.html
1844[flatMapLatest]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-latest.html
1845[catch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/catch.html
1846[onCompletion]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-completion.html
1847[launchIn]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/launch-in.html
1848<!--- END -->