blob: 2a3ffb598b8525c4147bbf597fb7f41926e383f6 [file] [log] [blame] [view]
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001<!--- INCLUDE .*/example-([a-z]+)-([0-9a-z]+)\.kt
2/*
3 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
4 */
5
6// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
7package kotlinx.coroutines.guide.$$1$$2
8-->
9<!--- KNIT ../kotlinx-coroutines-core/jvm/test/guide/.*-##\.kt -->
10<!--- TEST_OUT ../kotlinx-coroutines-core/jvm/test/guide/test/FlowGuideTest.kt
11// This file was automatically generated from flow.md by Knit tool. Do not edit.
12package kotlinx.coroutines.guide.test
13
14import org.junit.Test
15
16class FlowGuideTest {
17-->
18
19**Table of contents**
20
21<!--- TOC -->
22
23* [Asynchronous Flow](#asynchronous-flow)
24 * [Representing multiple values](#representing-multiple-values)
25 * [Sequences](#sequences)
26 * [Suspending functions](#suspending-functions)
27 * [Flows](#flows)
28 * [Flows are cold](#flows-are-cold)
29 * [Flow cancellation](#flow-cancellation)
30 * [Flow builders](#flow-builders)
31 * [Intermediate flow operators](#intermediate-flow-operators)
32 * [Transform operator](#transform-operator)
33 * [Size-limiting operators](#size-limiting-operators)
34 * [Terminal flow operators](#terminal-flow-operators)
35 * [Flows are sequential](#flows-are-sequential)
36 * [Flow context](#flow-context)
37 * [Wrong emission withContext](#wrong-emission-withcontext)
38 * [flowOn operator](#flowon-operator)
39 * [Buffering](#buffering)
40 * [Conflation](#conflation)
41 * [Processing the latest value](#processing-the-latest-value)
42 * [Composing multiple flows](#composing-multiple-flows)
43 * [Zip](#zip)
44 * [Combine](#combine)
45 * [Flattening flows](#flattening-flows)
46 * [flatMapConcat](#flatmapconcat)
47 * [flatMapMerge](#flatmapmerge)
48 * [flatMapLatest](#flatmaplatest)
49 * [Flow exceptions](#flow-exceptions)
50 * [Collector try and catch](#collector-try-and-catch)
51 * [Everything is caught](#everything-is-caught)
52 * [Exception transparency](#exception-transparency)
53 * [Transparent catch](#transparent-catch)
54 * [Catching declaratively](#catching-declaratively)
55 * [Flow completion](#flow-completion)
56 * [Imperative finally block](#imperative-finally-block)
57 * [Declarative handling](#declarative-handling)
58 * [Upstream exceptions only](#upstream-exceptions-only)
59 * [Imperative versus declarative](#imperative-versus-declarative)
60 * [Launching flow](#launching-flow)
61
62<!--- END_TOC -->
63
64## Asynchronous Flow
65
David.Watsonbb714c52019-08-30 17:49:42 +020066Suspending functions asynchronously returns a single value, but how can we return
67multiple asynchronously computed values? This is where Kotlin Flows come in.
Roman Elizarov3258e1f2019-08-22 20:08:48 +030068
69### Representing multiple values
70
71Multiple values can be represented in Kotlin using [collections].
72For example, we can have a function `foo()` that returns a [List]
David.Watsonbb714c52019-08-30 17:49:42 +020073of three numbers and then print them all using [forEach]:
Roman Elizarov3258e1f2019-08-22 20:08:48 +030074
75<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
76
77```kotlin
78fun foo(): List<Int> = listOf(1, 2, 3)
79
80fun main() {
81 foo().forEach { value -> println(value) }
82}
83```
84
85</div>
86
David.Watsonbb714c52019-08-30 17:49:42 +020087> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-01.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +030088
89This code outputs:
90
91```text
921
932
943
95```
96
97<!--- TEST -->
98
99#### Sequences
100
David.Watsonbb714c52019-08-30 17:49:42 +0200101If we are computing the numbers with some CPU-consuming blocking code
102(each computation taking 100ms), then we can represent the numbers using a [Sequence]:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300103
104<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
105
106```kotlin
107fun foo(): Sequence<Int> = sequence { // sequence builder
108 for (i in 1..3) {
109 Thread.sleep(100) // pretend we are computing it
110 yield(i) // yield next value
111 }
112}
113
114fun main() {
115 foo().forEach { value -> println(value) }
116}
117```
118
119</div>
120
David.Watsonbb714c52019-08-30 17:49:42 +0200121> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-02.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300122
123This code outputs the same numbers, but it waits 100ms before printing each one.
124
125<!--- TEST
1261
1272
1283
129-->
130
131#### Suspending functions
132
133However, this computation blocks the main thread that is running the code.
David.Watsonbb714c52019-08-30 17:49:42 +0200134When these values are computed by asynchronous code we can mark the function `foo` with a `suspend` modifier,
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300135so that it can perform its work without blocking and return the result as a list:
136
137<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
138
139```kotlin
140import kotlinx.coroutines.*
141
142//sampleStart
143suspend fun foo(): List<Int> {
144 delay(1000) // pretend we are doing something asynchronous here
145 return listOf(1, 2, 3)
146}
147
148fun main() = runBlocking<Unit> {
149 foo().forEach { value -> println(value) }
150}
151//sampleEnd
152```
153
154</div>
155
David.Watsonbb714c52019-08-30 17:49:42 +0200156> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-03.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300157
158This code prints the numbers after waiting for a second.
159
160<!--- TEST
1611
1622
1633
164-->
165
166#### Flows
167
David.Watsonbb714c52019-08-30 17:49:42 +0200168Using the `List<Int>` result type, means we can only return all the values at once. To represent
169the stream of values that are being asynchronously computed, we can use a [`Flow<Int>`][Flow] type just like we would the `Sequence<Int>` type for synchronously computed values:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300170
171<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
172
173```kotlin
174import kotlinx.coroutines.*
175import kotlinx.coroutines.flow.*
176
177//sampleStart
178fun foo(): Flow<Int> = flow { // flow builder
179 for (i in 1..3) {
180 delay(100) // pretend we are doing something useful here
181 emit(i) // emit next value
182 }
183}
184
185fun main() = runBlocking<Unit> {
David.Watsonbb714c52019-08-30 17:49:42 +0200186 // Launch a concurrent coroutine to check if the main thread is blocked
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300187 launch {
188 for (k in 1..3) {
189 println("I'm not blocked $k")
190 delay(100)
191 }
192 }
193 // Collect the flow
194 foo().collect { value -> println(value) }
195}
196//sampleEnd
197```
198
199</div>
200
David.Watsonbb714c52019-08-30 17:49:42 +0200201> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-04.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300202
203This code waits 100ms before printing each number without blocking the main thread. This is verified
204by printing "I'm not blocked" every 100ms from a separate coroutine that is running in the main thread:
205
206```text
207I'm not blocked 1
2081
209I'm not blocked 2
2102
211I'm not blocked 3
2123
213```
214
215<!--- TEST -->
216
David.Watsonbb714c52019-08-30 17:49:42 +0200217Notice the following differences in the code with the [Flow] from the earlier examples:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300218
219* A builder function for [Flow] type is called [flow].
220* Code inside the `flow { ... }` builder block can suspend.
221* The function `foo()` is no longer marked with `suspend` modifier.
222* Values are _emitted_ from the flow using [emit][FlowCollector.emit] function.
223* Values are _collected_ from the flow using [collect][collect] function.
224
David.Watsonbb714c52019-08-30 17:49:42 +0200225> We can replace [delay] with `Thread.sleep` in the body of `foo`'s `flow { ... }` and see that the main
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300226thread is blocked in this case.
227
228### Flows are cold
229
David.Watsonbb714c52019-08-30 17:49:42 +0200230Flows are _cold_ streams similar to sequences &mdash; the code inside a [flow] builder does not
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300231run until the flow is collected. This becomes clear in the following example:
232
233<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
234
235```kotlin
236import kotlinx.coroutines.*
237import kotlinx.coroutines.flow.*
238
239//sampleStart
240fun foo(): Flow<Int> = flow {
241 println("Flow started")
242 for (i in 1..3) {
243 delay(100)
244 emit(i)
245 }
246}
247
248fun main() = runBlocking<Unit> {
249 println("Calling foo...")
250 val flow = foo()
251 println("Calling collect...")
252 flow.collect { value -> println(value) }
253 println("Calling collect again...")
254 flow.collect { value -> println(value) }
255}
256//sampleEnd
257```
258
259</div>
260
David.Watsonbb714c52019-08-30 17:49:42 +0200261> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-05.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300262
263Which prints:
264
265```text
266Calling foo...
267Calling collect...
268Flow started
2691
2702
2713
272Calling collect again...
273Flow started
2741
2752
2763
277```
278
279<!--- TEST -->
280
David.Watsonbb714c52019-08-30 17:49:42 +0200281This is a key reason the `foo()` function (which returns a flow) is not marked with `suspend` modifier.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300282By itself, `foo()` returns quickly and does not wait for anything. The flow starts every time it is collected,
David.Watsonbb714c52019-08-30 17:49:42 +0200283that is why we see "Flow started" when we call `collect` again.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300284
285### Flow cancellation
286
David.Watsonbb714c52019-08-30 17:49:42 +0200287Flow adheres to the general cooperative cancellation of coroutines. However, flow infrastructure does not introduce
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300288additional cancellation points. It is fully transparent for cancellation. As usual, flow collection can be
David.Watsonbb714c52019-08-30 17:49:42 +0200289cancelled when the flow is suspended in a cancellable suspending function (like [delay]), and cannot be cancelled otherwise.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300290
David.Watsonbb714c52019-08-30 17:49:42 +0200291The following example shows how the flow gets cancelled on a timeout when running in a [withTimeoutOrNull] block
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300292and stops executing its code:
293
294<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
295
296```kotlin
297import kotlinx.coroutines.*
298import kotlinx.coroutines.flow.*
299
300//sampleStart
301fun foo(): Flow<Int> = flow {
302 for (i in 1..3) {
303 delay(100)
304 println("Emitting $i")
305 emit(i)
306 }
307}
308
309fun main() = runBlocking<Unit> {
310 withTimeoutOrNull(250) { // Timeout after 250ms
311 foo().collect { value -> println(value) }
312 }
313 println("Done")
314}
315//sampleEnd
316```
317
318</div>
319
David.Watsonbb714c52019-08-30 17:49:42 +0200320> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-06.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300321
322Notice how only two numbers get emitted by the flow in `foo()` function, producing the following output:
323
324```text
325Emitting 1
3261
327Emitting 2
3282
329Done
330```
331
332<!--- TEST -->
333
334### Flow builders
335
336The `flow { ... }` builder from the previous examples is the most basic one. There are other builders for
David.Watsonbb714c52019-08-30 17:49:42 +0200337easier declaration of flows:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300338
339* [flowOf] builder that defines a flow emitting a fixed set of values.
340* Various collections and sequences can be converted to flows using `.asFlow()` extension functions.
341
David.Watsonbb714c52019-08-30 17:49:42 +0200342So, the example that prints the numbers from 1 to 3 from a flow can be written as:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300343
344<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
345
346```kotlin
347import kotlinx.coroutines.*
348import kotlinx.coroutines.flow.*
349
350fun main() = runBlocking<Unit> {
351//sampleStart
352 // Convert an integer range to a flow
353 (1..3).asFlow().collect { value -> println(value) }
354//sampleEnd
355}
356```
357
358</div>
359
David.Watsonbb714c52019-08-30 17:49:42 +0200360> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-07.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300361
362<!--- TEST
3631
3642
3653
366-->
367
368### Intermediate flow operators
369
David.Watsonbb714c52019-08-30 17:49:42 +0200370Flows can be transformed with operators, just as you would with collections and sequences.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300371Intermediate operators are applied to an upstream flow and return a downstream flow.
372These operators are cold, just like flows are. A call to such an operator is not
373a suspending function itself. It works quickly, returning the definition of a new transformed flow.
374
375The basic operators have familiar names like [map] and [filter].
David.Watsonbb714c52019-08-30 17:49:42 +0200376The important difference to sequences is that blocks of
377code inside these operators can call suspending functions.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300378
379For example, a flow of incoming requests can be
David.Watsonbb714c52019-08-30 17:49:42 +0200380mapped to the results with the [map] operator, even when performing a request is a long-running
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300381operation that is implemented by a suspending function:
382
383<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
384
385```kotlin
386import kotlinx.coroutines.*
387import kotlinx.coroutines.flow.*
388
389//sampleStart
390suspend fun performRequest(request: Int): String {
391 delay(1000) // imitate long-running asynchronous work
392 return "response $request"
393}
394
395fun main() = runBlocking<Unit> {
396 (1..3).asFlow() // a flow of requests
397 .map { request -> performRequest(request) }
398 .collect { response -> println(response) }
399}
400//sampleEnd
401```
402
403</div>
404
David.Watsonbb714c52019-08-30 17:49:42 +0200405> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-08.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300406
David.Watsonbb714c52019-08-30 17:49:42 +0200407It produces the following three lines, each line appearing after each second:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300408
409```text
410response 1
411response 2
412response 3
413```
414
415<!--- TEST -->
416
417#### Transform operator
418
419Among the flow transformation operators, the most general one is called [transform]. It can be used to imitate
David.Watsonbb714c52019-08-30 17:49:42 +0200420simple transformations like [map] and [filter], as well as implement more complex transformations.
421Using the `transform` operator, we can [emit][FlowCollector.emit] arbitrary values an arbitrary number of times.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300422
423For example, using `transform` we can emit a string before performing a long-running asynchronous request
424and follow it with a response:
425
426<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
427
428```kotlin
429import kotlinx.coroutines.*
430import kotlinx.coroutines.flow.*
431
432suspend fun performRequest(request: Int): String {
433 delay(1000) // imitate long-running asynchronous work
434 return "response $request"
435}
436
437fun main() = runBlocking<Unit> {
438//sampleStart
439 (1..3).asFlow() // a flow of requests
440 .transform { request ->
441 emit("Making request $request")
442 emit(performRequest(request))
443 }
444 .collect { response -> println(response) }
445//sampleEnd
446}
447```
448
449</div>
450
David.Watsonbb714c52019-08-30 17:49:42 +0200451> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-09.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300452
453The output of this code is:
454
455```text
456Making request 1
457response 1
458Making request 2
459response 2
460Making request 3
461response 3
462```
463
464<!--- TEST -->
465
466#### Size-limiting operators
467
468Size-limiting intermediate operators like [take] cancel the execution of the flow when the corresponding limit
David.Watsonbb714c52019-08-30 17:49:42 +0200469is reached. Cancellation in coroutines is always performed by throwing an exception, so that all the resource-management
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300470functions (like `try { ... } finally { ... }` blocks) operate normally in case of cancellation:
471
472<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
473
474```kotlin
475import kotlinx.coroutines.*
476import kotlinx.coroutines.flow.*
477
478//sampleStart
479fun numbers(): Flow<Int> = flow {
480 try {
481 emit(1)
482 emit(2)
483 println("This line will not execute")
484 emit(3)
485 } finally {
486 println("Finally in numbers")
487 }
488}
489
490fun main() = runBlocking<Unit> {
491 numbers()
492 .take(2) // take only the first two
493 .collect { value -> println(value) }
494}
495//sampleEnd
496```
497
498</div>
499
David.Watsonbb714c52019-08-30 17:49:42 +0200500> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-10.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300501
David.Watsonbb714c52019-08-30 17:49:42 +0200502The output of this code clearly shows that the execution of the `flow { ... }` body in the `numbers()` function
503stopped after emitting the second number:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300504
505```text
5061
5072
508Finally in numbers
509```
510
511<!--- TEST -->
512
513### Terminal flow operators
514
515Terminal operators on flows are _suspending functions_ that start a collection of the flow.
David.Watsonbb714c52019-08-30 17:49:42 +0200516The [collect] operator is the most basic one, but there are other terminal operators, which can make it easier:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300517
518* Conversion to various collections like [toList] and [toSet].
519* Operators to get the [first] value and to ensure that a flow emits a [single] value.
520* Reducing a flow to a value with [reduce] and [fold].
521
522For example:
523
524<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
525
526```kotlin
527import kotlinx.coroutines.*
528import kotlinx.coroutines.flow.*
529
530fun main() = runBlocking<Unit> {
531//sampleStart
532 val sum = (1..5).asFlow()
533 .map { it * it } // squares of numbers from 1 to 5
534 .reduce { a, b -> a + b } // sum them (terminal operator)
535 println(sum)
536//sampleEnd
537}
538```
539
540</div>
541
David.Watsonbb714c52019-08-30 17:49:42 +0200542> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-11.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300543
544Prints a single number:
545
546```text
54755
548```
549
550<!--- TEST -->
551
552### Flows are sequential
553
554Each individual collection of a flow is performed sequentially unless special operators that operate
555on multiple flows are used. The collection works directly in the coroutine that calls a terminal operator.
556No new coroutines are launched by default.
David.Watsonbb714c52019-08-30 17:49:42 +0200557Each emitted value is processed by all the intermediate operators from
558upstream to downstream and is then delivered to the terminal operator after.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300559
David.Watsonbb714c52019-08-30 17:49:42 +0200560See the following example that filters the even integers and maps them to strings:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300561
562<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
563
564```kotlin
565import kotlinx.coroutines.*
566import kotlinx.coroutines.flow.*
567
568fun main() = runBlocking<Unit> {
569//sampleStart
570 (1..5).asFlow()
571 .filter {
572 println("Filter $it")
573 it % 2 == 0
574 }
575 .map {
576 println("Map $it")
577 "string $it"
578 }.collect {
579 println("Collect $it")
580 }
581//sampleEnd
582}
583```
584
585</div>
586
David.Watsonbb714c52019-08-30 17:49:42 +0200587> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-12.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300588
589Producing:
590
591```text
592Filter 1
593Filter 2
594Map 2
595Collect string 2
596Filter 3
597Filter 4
598Map 4
599Collect string 4
600Filter 5
601```
602
603<!--- TEST -->
604
605### Flow context
606
607Collection of a flow always happens in the context of the calling coroutine. For example, if there is
608a `foo` flow, then the following code runs in the context specified
David.Watsonbb714c52019-08-30 17:49:42 +0200609by the author of this code, regardless of the implementation details of the `foo` flow:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300610
611<div class="sample" markdown="1" theme="idea" data-highlight-only>
612
613```kotlin
614withContext(context) {
615 foo.collect { value ->
616 println(value) // run in the specified context
617 }
618}
619```
620
621</div>
622
623<!--- CLEAR -->
624
625This property of a flow is called _context preservation_.
626
627So, by default, code in the `flow { ... }` builder runs in the context that is provided by a collector
628of the corresponding flow. For example, consider the implementation of `foo` that prints the thread
629it is called on and emits three numbers:
630
631<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
632
633```kotlin
634import kotlinx.coroutines.*
635import kotlinx.coroutines.flow.*
636
637fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
638
639//sampleStart
640fun foo(): Flow<Int> = flow {
641 log("Started foo flow")
642 for (i in 1..3) {
643 emit(i)
644 }
645}
646
647fun main() = runBlocking<Unit> {
648 foo().collect { value -> log("Collected $value") }
649}
650//sampleEnd
651```
652
653</div>
654
David.Watsonbb714c52019-08-30 17:49:42 +0200655> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-13.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300656
657Running this code produces:
658
659```text
660[main @coroutine#1] Started foo flow
661[main @coroutine#1] Collected 1
662[main @coroutine#1] Collected 2
663[main @coroutine#1] Collected 3
664```
665
666<!--- TEST FLEXIBLE_THREAD -->
667
668Since `foo().collect` is called from the main thread, the body of `foo`'s flow is also called in the main thread.
David.Watsonbb714c52019-08-30 17:49:42 +0200669This is the perfect default for fast-running or asynchronous code that does not care about the execution context and
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300670does not block the caller.
671
672#### Wrong emission withContext
673
674However, the long-running CPU-consuming code might need to be executed in the context of [Dispatchers.Default] and UI-updating
675code might need to be executed in the context of [Dispatchers.Main]. Usually, [withContext] is used
David.Watsonbb714c52019-08-30 17:49:42 +0200676to change the context in the code using Kotlin coroutines, but code in the `flow { ... }` builder has to honor the context
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300677preservation property and is not allowed to [emit][FlowCollector.emit] from a different context.
678
679Try running the following code:
680
681<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
682
683```kotlin
684import kotlinx.coroutines.*
685import kotlinx.coroutines.flow.*
686
687//sampleStart
688fun foo(): Flow<Int> = flow {
David.Watsonbb714c52019-08-30 17:49:42 +0200689 // The WRONG way to change context for CPU-consuming code in flow builder
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300690 kotlinx.coroutines.withContext(Dispatchers.Default) {
691 for (i in 1..3) {
692 Thread.sleep(100) // pretend we are computing it in CPU-consuming way
693 emit(i) // emit next value
694 }
695 }
696}
697
698fun main() = runBlocking<Unit> {
699 foo().collect { value -> println(value) }
700}
701//sampleEnd
702```
703
704</div>
705
David.Watsonbb714c52019-08-30 17:49:42 +0200706> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-14.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300707
708This code produces the following exception:
709
710<!--- TEST EXCEPTION
711Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
712 Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
713 but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, DefaultDispatcher].
714 Please refer to 'flow' documentation or use 'flowOn' instead
715 at ...
716-->
717
David.Watsonbb714c52019-08-30 17:49:42 +0200718> Note that we had to use a fully qualified name of the [kotlinx.coroutines.withContext][withContext] function in this example to
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300719demonstrate this exception. A short name of `withContext` would have resolved to a special stub function that
David.Watsonbb714c52019-08-30 17:49:42 +0200720produces a compilation error to prevent us from running into this problem.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300721
722#### flowOn operator
723
David.Watsonbb714c52019-08-30 17:49:42 +0200724The exception refers to the [flowOn] function that shall be used to change the context of the flow emission.
725The correct way to change the context of a flow is shown in the example below, which also prints the
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300726names of the corresponding threads to show how it all works:
727
728<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
729
730```kotlin
731import kotlinx.coroutines.*
732import kotlinx.coroutines.flow.*
733
734fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
735
736//sampleStart
737fun foo(): Flow<Int> = flow {
738 for (i in 1..3) {
739 Thread.sleep(100) // pretend we are computing it in CPU-consuming way
740 log("Emitting $i")
741 emit(i) // emit next value
742 }
743}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
744
745fun main() = runBlocking<Unit> {
746 foo().collect { value ->
747 log("Collected $value")
748 }
749}
750//sampleEnd
751```
752
753</div>
754
David.Watsonbb714c52019-08-30 17:49:42 +0200755> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-15.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300756
757Notice how `flow { ... }` works in the background thread, while collection happens in the main thread:
758
759<!--- TEST FLEXIBLE_THREAD
760[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
761[main @coroutine#1] Collected 1
762[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
763[main @coroutine#1] Collected 2
764[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
765[main @coroutine#1] Collected 3
766-->
767
David.Watsonbb714c52019-08-30 17:49:42 +0200768Another thing to observe here is that the [flowOn] operator has changed the default sequential nature of the flow.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300769Now collection happens in one coroutine ("coroutine#1") and emission happens in another coroutine
David.Watsonbb714c52019-08-30 17:49:42 +0200770("coroutine#2") that is running in another thread concurrently with the collecting coroutine. The [flowOn] operator
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300771creates another coroutine for an upstream flow when it has to change the [CoroutineDispatcher] in its context.
772
773### Buffering
774
David.Watsonbb714c52019-08-30 17:49:42 +0200775Running different parts of a flow in different coroutines can be helpful from the standpoint of the overall time it takes
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300776to collect the flow, especially when long-running asynchronous operations are involved. For example, consider a case when
David.Watsonbb714c52019-08-30 17:49:42 +0200777the emission by `foo()` flow is slow, taking 100 ms to produce an element; and collector is also slow,
778taking 300 ms to process an element. Let's see how long it takes to collect such a flow with three numbers:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300779
780<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
781
782```kotlin
783import kotlinx.coroutines.*
784import kotlinx.coroutines.flow.*
785import kotlin.system.*
786
787//sampleStart
788fun foo(): Flow<Int> = flow {
789 for (i in 1..3) {
790 delay(100) // pretend we are asynchronously waiting 100 ms
791 emit(i) // emit next value
792 }
793}
794
795fun main() = runBlocking<Unit> {
796 val time = measureTimeMillis {
797 foo().collect { value ->
798 delay(300) // pretend we are processing it for 300 ms
799 println(value)
800 }
801 }
802 println("Collected in $time ms")
803}
804//sampleEnd
805```
806
807</div>
808
David.Watsonbb714c52019-08-30 17:49:42 +0200809> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-16.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300810
David.Watsonbb714c52019-08-30 17:49:42 +0200811It produces something like this, with the whole collection taking around 1200 ms (three numbers, 400 ms for each):
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300812
813```text
8141
8152
8163
817Collected in 1220 ms
818```
819
820<!--- TEST ARBITRARY_TIME -->
821
David.Watsonbb714c52019-08-30 17:49:42 +0200822We can use a [buffer] operator on a flow to run emitting code of `foo()` concurrently with collecting code,
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300823as opposed to running them sequentially:
824
825<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
826
827```kotlin
828import kotlinx.coroutines.*
829import kotlinx.coroutines.flow.*
830import kotlin.system.*
831
832fun foo(): Flow<Int> = flow {
833 for (i in 1..3) {
834 delay(100) // pretend we are asynchronously waiting 100 ms
835 emit(i) // emit next value
836 }
837}
838
839fun main() = runBlocking<Unit> {
840//sampleStart
841 val time = measureTimeMillis {
842 foo()
843 .buffer() // buffer emissions, don't wait
844 .collect { value ->
845 delay(300) // pretend we are processing it for 300 ms
846 println(value)
847 }
848 }
849 println("Collected in $time ms")
850//sampleEnd
851}
852```
853
854</div>
855
David.Watsonbb714c52019-08-30 17:49:42 +0200856> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-17.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300857
David.Watsonbb714c52019-08-30 17:49:42 +0200858It produces the same numbers just faster, as we have effectively created a processing pipeline,
859having to only wait 100 ms for the first number and then spending only 300 ms to process
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300860each number. This way it takes around 1000 ms to run:
861
862```text
8631
8642
8653
866Collected in 1071 ms
867```
868
869<!--- TEST ARBITRARY_TIME -->
870
David.Watsonbb714c52019-08-30 17:49:42 +0200871> Note that the [flowOn] operator uses the same buffering mechanism when it has to change a [CoroutineDispatcher],
872but here we explicitly request buffering without changing the execution context.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300873
874#### Conflation
875
David.Watsonbb714c52019-08-30 17:49:42 +0200876When a flow represents partial results of the operation or operation status updates, it may not be necessary
877to process each value, but instead, only most recent ones. In this case, the [conflate] operator can be used to skip
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300878intermediate values when a collector is too slow to process them. Building on the previous example:
879
880<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
881
882```kotlin
883import kotlinx.coroutines.*
884import kotlinx.coroutines.flow.*
885import kotlin.system.*
886
887fun foo(): Flow<Int> = flow {
888 for (i in 1..3) {
889 delay(100) // pretend we are asynchronously waiting 100 ms
890 emit(i) // emit next value
891 }
892}
893
894fun main() = runBlocking<Unit> {
895//sampleStart
896 val time = measureTimeMillis {
897 foo()
898 .conflate() // conflate emissions, don't process each one
899 .collect { value ->
900 delay(300) // pretend we are processing it for 300 ms
901 println(value)
902 }
903 }
904 println("Collected in $time ms")
905//sampleEnd
906}
907```
908
909</div>
910
David.Watsonbb714c52019-08-30 17:49:42 +0200911> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-18.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300912
David.Watsonbb714c52019-08-30 17:49:42 +0200913We see that while the first number was still being processed the second, and third were already produced, so
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300914the second one was _conflated_ and only the most recent (the third one) was delivered to the collector:
915
916```text
9171
9183
919Collected in 758 ms
920```
921
922<!--- TEST ARBITRARY_TIME -->
923
924#### Processing the latest value
925
David.Watsonbb714c52019-08-30 17:49:42 +0200926Conflation is one way to speed up processing when both the emitter and collector are slow. It does it by dropping emitted values.
927The other way is to cancel a slow collector and restart it every time a new value is emitted. There is
928a family of `xxxLatest` operators that perform the same essential logic of a `xxx` operator, but cancel the
929code in their block on a new value. Let's try changing [conflate] to [collectLatest] in the previous example:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300930
931<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
932
933```kotlin
934import kotlinx.coroutines.*
935import kotlinx.coroutines.flow.*
936import kotlin.system.*
937
938fun foo(): Flow<Int> = flow {
939 for (i in 1..3) {
940 delay(100) // pretend we are asynchronously waiting 100 ms
941 emit(i) // emit next value
942 }
943}
944
945fun main() = runBlocking<Unit> {
946//sampleStart
947 val time = measureTimeMillis {
948 foo()
949 .collectLatest { value -> // cancel & restart on the latest value
950 println("Collecting $value")
951 delay(300) // pretend we are processing it for 300 ms
952 println("Done $value")
953 }
954 }
955 println("Collected in $time ms")
956//sampleEnd
957}
958```
959
960</div>
961
David.Watsonbb714c52019-08-30 17:49:42 +0200962> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-19.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300963
964Since the body of [collectLatest] takes 300 ms, but new values are emitted every 100 ms, we see that the block
965is run on every value, but completes only for the last value:
966
967```text
968Collecting 1
969Collecting 2
970Collecting 3
971Done 3
972Collected in 741 ms
973```
974
975<!--- TEST ARBITRARY_TIME -->
976
977### Composing multiple flows
978
David.Watsonbb714c52019-08-30 17:49:42 +0200979There are lots of ways to compose multiple flows.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300980
981#### Zip
982
David.Watsonbb714c52019-08-30 17:49:42 +0200983Just like the [Sequence.zip] extension function in the Kotlin standard library,
984flows have a [zip] operator that combines the corresponding values of two flows:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300985
986<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
987
988```kotlin
989import kotlinx.coroutines.*
990import kotlinx.coroutines.flow.*
991
992fun main() = runBlocking<Unit> {
993//sampleStart
994 val nums = (1..3).asFlow() // numbers 1..3
995 val strs = flowOf("one", "two", "three") // strings
996 nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
997 .collect { println(it) } // collect and print
998//sampleEnd
999}
1000```
1001
1002</div>
1003
David.Watsonbb714c52019-08-30 17:49:42 +02001004> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-20.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001005
1006This example prints:
1007
1008```text
10091 -> one
10102 -> two
10113 -> three
1012```
1013
1014<!--- TEST -->
1015
1016#### Combine
1017
David.Watsonbb714c52019-08-30 17:49:42 +02001018When flow represents the most recent value of a variable or operation (see also the related
1019section on [conflation](#conflation)), it might be needed to perform a computation that depends on
1020the most recent values of the corresponding flows and to recompute it whenever any of the upstream
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001021flows emit a value. The corresponding family of operators is called [combine].
1022
1023For example, if the numbers in the previous example update every 300ms, but strings update every 400 ms,
David.Watsonbb714c52019-08-30 17:49:42 +02001024then zipping them using the [zip] operator will still produce the same result,
1025albeit results that are printed every 400 ms:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001026
David.Watsonbb714c52019-08-30 17:49:42 +02001027> We use a [onEach] intermediate operator in this example to delay each element and make the code
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001028that emits sample flows more declarative and shorter.
1029
1030<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1031
1032```kotlin
1033import kotlinx.coroutines.*
1034import kotlinx.coroutines.flow.*
1035
1036fun main() = runBlocking<Unit> {
1037//sampleStart
1038 val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
1039 val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
1040 val startTime = System.currentTimeMillis() // remember the start time
1041 nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
1042 .collect { value -> // collect and print
1043 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1044 }
1045//sampleEnd
1046}
1047```
1048
1049</div>
1050
David.Watsonbb714c52019-08-30 17:49:42 +02001051> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-21.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001052
1053<!--- TEST ARBITRARY_TIME
10541 -> one at 437 ms from start
10552 -> two at 837 ms from start
10563 -> three at 1243 ms from start
1057-->
1058
David.Watsonbb714c52019-08-30 17:49:42 +02001059However, when using a [combine] operator here instead of a [zip]:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001060
1061<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1062
1063```kotlin
1064import kotlinx.coroutines.*
1065import kotlinx.coroutines.flow.*
1066
1067fun main() = runBlocking<Unit> {
1068//sampleStart
1069 val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
1070 val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
Roman Elizarova73862f2019-09-02 17:31:14 +03001071 val startTime = System.currentTimeMillis() // remember the start time
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001072 nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
1073 .collect { value -> // collect and print
1074 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1075 }
1076//sampleEnd
1077}
1078```
1079
1080</div>
1081
David.Watsonbb714c52019-08-30 17:49:42 +02001082> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-22.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001083
1084We get quite a different output, where a line is printed at each emission from either `nums` or `strs` flows:
1085
1086```text
10871 -> one at 452 ms from start
10882 -> one at 651 ms from start
10892 -> two at 854 ms from start
10903 -> two at 952 ms from start
10913 -> three at 1256 ms from start
1092```
1093
1094<!--- TEST ARBITRARY_TIME -->
1095
1096### Flattening flows
1097
1098Flows represent asynchronously received sequences of values, so it is quite easy to get in a situation where
1099each value triggers a request for another sequence of values. For example, we can have the following
1100function that returns a flow of two strings 500 ms apart:
1101
1102<div class="sample" markdown="1" theme="idea" data-highlight-only>
1103
1104```kotlin
1105fun requestFlow(i: Int): Flow<String> = flow {
1106 emit("$i: First")
1107 delay(500) // wait 500 ms
1108 emit("$i: Second")
1109}
1110```
1111
1112</div>
1113
1114<!--- CLEAR -->
1115
1116Now if we have a flow of three integers and call `requestFlow` for each of them like this:
1117
1118<div class="sample" markdown="1" theme="idea" data-highlight-only>
1119
1120```kotlin
1121(1..3).asFlow().map { requestFlow(it) }
1122```
1123
1124</div>
1125
1126<!--- CLEAR -->
1127
1128Then we end up with a flow of flows (`Flow<Flow<String>>`) that needs to be _flattened_ into a single flow for
1129further processing. Collections and sequences have [flatten][Sequence.flatten] and [flatMap][Sequence.flatMap]
David.Watsonbb714c52019-08-30 17:49:42 +02001130operators for this. However, due the asynchronous nature of flows they call for different _modes_ of flattening,
1131as such, there is a family of flattening operators on flows.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001132
1133#### flatMapConcat
1134
1135Concatenating mode is implemented by [flatMapConcat] and [flattenConcat] operators. They are the most direct
David.Watsonbb714c52019-08-30 17:49:42 +02001136analogues of the corresponding sequence operators. They wait for the inner flow to complete before
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001137starting to collect the next one as the following example shows:
1138
1139<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1140
1141```kotlin
1142import kotlinx.coroutines.*
1143import kotlinx.coroutines.flow.*
1144
1145fun requestFlow(i: Int): Flow<String> = flow {
1146 emit("$i: First")
1147 delay(500) // wait 500 ms
1148 emit("$i: Second")
1149}
1150
1151fun main() = runBlocking<Unit> {
1152//sampleStart
Roman Elizarova73862f2019-09-02 17:31:14 +03001153 val startTime = System.currentTimeMillis() // remember the start time
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001154 (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
1155 .flatMapConcat { requestFlow(it) }
1156 .collect { value -> // collect and print
1157 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1158 }
1159//sampleEnd
1160}
1161```
1162
1163</div>
1164
David.Watsonbb714c52019-08-30 17:49:42 +02001165> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-23.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001166
1167The sequential nature of [flatMapConcat] is clearly seen in the output:
1168
1169```text
11701: First at 121 ms from start
11711: Second at 622 ms from start
11722: First at 727 ms from start
11732: Second at 1227 ms from start
11743: First at 1328 ms from start
11753: Second at 1829 ms from start
1176```
1177
1178<!--- TEST ARBITRARY_TIME -->
1179
1180#### flatMapMerge
1181
1182Another flattening mode is to concurrently collect all the incoming flows and merge their values into
1183a single flow so that values are emitted as soon as possible.
1184It is implemented by [flatMapMerge] and [flattenMerge] operators. They both accept an optional
1185`concurrency` parameter that limits the number of concurrent flows that are collected at the same time
1186(it is equal to [DEFAULT_CONCURRENCY] by default).
1187
1188<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1189
1190```kotlin
1191import kotlinx.coroutines.*
1192import kotlinx.coroutines.flow.*
1193
1194fun requestFlow(i: Int): Flow<String> = flow {
1195 emit("$i: First")
1196 delay(500) // wait 500 ms
1197 emit("$i: Second")
1198}
1199
1200fun main() = runBlocking<Unit> {
1201//sampleStart
Roman Elizarova73862f2019-09-02 17:31:14 +03001202 val startTime = System.currentTimeMillis() // remember the start time
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001203 (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
1204 .flatMapMerge { requestFlow(it) }
1205 .collect { value -> // collect and print
1206 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1207 }
1208//sampleEnd
1209}
1210```
1211
1212</div>
1213
David.Watsonbb714c52019-08-30 17:49:42 +02001214> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-24.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001215
1216The concurrent nature of [flatMapMerge] is obvious:
1217
1218```text
12191: First at 136 ms from start
12202: First at 231 ms from start
12213: First at 333 ms from start
12221: Second at 639 ms from start
12232: Second at 732 ms from start
12243: Second at 833 ms from start
1225```
1226
1227<!--- TEST ARBITRARY_TIME -->
1228
David.Watsonbb714c52019-08-30 17:49:42 +02001229> Note that the [flatMapMerge] calls its block of code (`{ requestFlow(it) }` in this example) sequentially, but
1230collects the resulting flows concurrently, it is the equivalent of performing a sequential
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001231`map { requestFlow(it) }` first and then calling [flattenMerge] on the result.
1232
1233#### flatMapLatest
1234
David.Watsonbb714c52019-08-30 17:49:42 +02001235In a similar way to the [collectLatest] operator, that was shown in
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001236["Processing the latest value"](#processing-the-latest-value) section, there is the corresponding "Latest"
David.Watsonbb714c52019-08-30 17:49:42 +02001237flattening mode where a collection of the previous flow is cancelled as soon as new flow is emitted.
1238It is implemented by the [flatMapLatest] operator.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001239
1240<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1241
1242```kotlin
1243import kotlinx.coroutines.*
1244import kotlinx.coroutines.flow.*
1245
1246fun requestFlow(i: Int): Flow<String> = flow {
1247 emit("$i: First")
1248 delay(500) // wait 500 ms
1249 emit("$i: Second")
1250}
1251
1252fun main() = runBlocking<Unit> {
1253//sampleStart
Roman Elizarova73862f2019-09-02 17:31:14 +03001254 val startTime = System.currentTimeMillis() // remember the start time
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001255 (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
1256 .flatMapLatest { requestFlow(it) }
1257 .collect { value -> // collect and print
1258 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1259 }
1260//sampleEnd
1261}
1262```
1263
1264</div>
1265
David.Watsonbb714c52019-08-30 17:49:42 +02001266> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-25.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001267
David.Watsonbb714c52019-08-30 17:49:42 +02001268The output here in this example is a good demonstration of how [flatMapLatest] works:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001269
1270```text
12711: First at 142 ms from start
12722: First at 322 ms from start
12733: First at 425 ms from start
12743: Second at 931 ms from start
1275```
1276
1277<!--- TEST ARBITRARY_TIME -->
1278
1279> Note that [flatMapLatest] cancels all the code in its block (`{ requestFlow(it) }` in this example) on a new value.
1280It makes no difference in this particular example, because the call to `requestFlow` itself is fast, not-suspending,
1281and cannot be cancelled. However, it would show up if we were to use suspending functions like `delay` in there.
1282
1283### Flow exceptions
1284
David.Watsonbb714c52019-08-30 17:49:42 +02001285Flow collection can complete with an exception when an emitter or code inside the operators throw an exception.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001286There are several ways to handle these exceptions.
1287
1288#### Collector try and catch
1289
1290A collector can use Kotlin's [`try/catch`][exceptions] block to handle exceptions:
1291
1292<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1293
1294```kotlin
1295import kotlinx.coroutines.*
1296import kotlinx.coroutines.flow.*
1297
1298//sampleStart
1299fun foo(): Flow<Int> = flow {
1300 for (i in 1..3) {
1301 println("Emitting $i")
1302 emit(i) // emit next value
1303 }
1304}
1305
1306fun main() = runBlocking<Unit> {
1307 try {
1308 foo().collect { value ->
1309 println(value)
1310 check(value <= 1) { "Collected $value" }
1311 }
1312 } catch (e: Throwable) {
1313 println("Caught $e")
1314 }
1315}
1316//sampleEnd
1317```
1318
1319</div>
1320
David.Watsonbb714c52019-08-30 17:49:42 +02001321> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-26.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001322
1323This code successfully catches an exception in [collect] terminal operator and,
David.Watsonbb714c52019-08-30 17:49:42 +02001324as we see, no more values are emitted after that:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001325
1326```text
1327Emitting 1
13281
1329Emitting 2
13302
1331Caught java.lang.IllegalStateException: Collected 2
1332```
1333
1334<!--- TEST -->
1335
1336#### Everything is caught
1337
David.Watsonbb714c52019-08-30 17:49:42 +02001338The previous example actually catches any exception happening in the emitter or in any intermediate or terminal operators.
1339For example, let's change the code so that emitted values are [mapped][map] to strings,
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001340but the corresponding code produces an exception:
1341
1342<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1343
1344```kotlin
1345import kotlinx.coroutines.*
1346import kotlinx.coroutines.flow.*
1347
1348//sampleStart
1349fun foo(): Flow<String> =
1350 flow {
1351 for (i in 1..3) {
1352 println("Emitting $i")
1353 emit(i) // emit next value
1354 }
1355 }
1356 .map { value ->
1357 check(value <= 1) { "Crashed on $value" }
1358 "string $value"
1359 }
1360
1361fun main() = runBlocking<Unit> {
1362 try {
1363 foo().collect { value -> println(value) }
1364 } catch (e: Throwable) {
1365 println("Caught $e")
1366 }
1367}
1368//sampleEnd
1369```
1370
1371</div>
1372
David.Watsonbb714c52019-08-30 17:49:42 +02001373> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-27.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001374
1375This exception is still caught and collection is stopped:
1376
1377```text
1378Emitting 1
1379string 1
1380Emitting 2
1381Caught java.lang.IllegalStateException: Crashed on 2
1382```
1383
1384<!--- TEST -->
1385
1386### Exception transparency
1387
David.Watsonbb714c52019-08-30 17:49:42 +02001388But how can code of the emitter encapsulate its exception handling behavior?
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001389
David.Watsonbb714c52019-08-30 17:49:42 +02001390Flows must be _transparent to exceptions_ and it is a violation of the exception transparency to [emit][FlowCollector.emit] values in the
1391`flow { ... }` builder from inside of a `try/catch` block. This guarantees that a collector throwing an exception
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001392can always catch it using `try/catch` as in the previous example.
1393
David.Watsonbb714c52019-08-30 17:49:42 +02001394The emitter can use a [catch] operator that preserves this exception transparency and allows encapsulation
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001395of its exception handling. The body of the `catch` operator can analyze an exception
1396and react to it in different ways depending on which exception was caught:
1397
1398* Exceptions can be rethrown using `throw`.
1399* Exceptions can be turned into emission of values using [emit][FlowCollector.emit] from the body of [catch].
1400* Exceptions can be ignored, logged, or processed by some other code.
1401
David.Watsonbb714c52019-08-30 17:49:42 +02001402For example, let us emit the text on catching an exception:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001403
1404<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1405
1406```kotlin
1407import kotlinx.coroutines.*
1408import kotlinx.coroutines.flow.*
1409
1410fun foo(): Flow<String> =
1411 flow {
1412 for (i in 1..3) {
1413 println("Emitting $i")
1414 emit(i) // emit next value
1415 }
1416 }
1417 .map { value ->
1418 check(value <= 1) { "Crashed on $value" }
1419 "string $value"
1420 }
1421
1422fun main() = runBlocking<Unit> {
1423//sampleStart
1424 foo()
1425 .catch { e -> emit("Caught $e") } // emit on exception
1426 .collect { value -> println(value) }
1427//sampleEnd
1428}
1429```
1430
1431</div>
1432
David.Watsonbb714c52019-08-30 17:49:42 +02001433> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-28.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001434
1435The output of the example is the same, even though we do not have `try/catch` around the code anymore.
1436
1437<!--- TEST
1438Emitting 1
1439string 1
1440Emitting 2
1441Caught java.lang.IllegalStateException: Crashed on 2
1442-->
1443
1444#### Transparent catch
1445
1446The [catch] intermediate operator, honoring exception transparency, catches only upstream exceptions
1447(that is an exception from all the operators above `catch`, but not below it).
1448If the block in `collect { ... }` (placed below `catch`) throws an exception then it escapes:
1449
1450<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1451
1452```kotlin
1453import kotlinx.coroutines.*
1454import kotlinx.coroutines.flow.*
1455
1456//sampleStart
1457fun foo(): Flow<Int> = flow {
1458 for (i in 1..3) {
1459 println("Emitting $i")
1460 emit(i)
1461 }
1462}
1463
1464fun main() = runBlocking<Unit> {
1465 foo()
1466 .catch { e -> println("Caught $e") } // does not catch downstream exceptions
1467 .collect { value ->
1468 check(value <= 1) { "Collected $value" }
1469 println(value)
1470 }
1471}
1472//sampleEnd
1473```
1474
1475</div>
1476
David.Watsonbb714c52019-08-30 17:49:42 +02001477> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-29.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001478
David.Watsonbb714c52019-08-30 17:49:42 +02001479A "Caught ..." message is not printed despite there being a `catch` operator:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001480
1481<!--- TEST EXCEPTION
1482Emitting 1
14831
1484Emitting 2
1485Exception in thread "main" java.lang.IllegalStateException: Collected 2
1486 at ...
1487-->
1488
1489#### Catching declaratively
1490
David.Watsonbb714c52019-08-30 17:49:42 +02001491We can combine the declarative nature of the [catch] operator with a desire to handle all the exceptions, by moving the body
1492of the [collect] operator into [onEach] and putting it before the `catch` operator. Collection of this flow must
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001493be triggered by a call to `collect()` without parameters:
1494
1495<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1496
1497```kotlin
1498import kotlinx.coroutines.*
1499import kotlinx.coroutines.flow.*
1500
1501fun foo(): Flow<Int> = flow {
1502 for (i in 1..3) {
1503 println("Emitting $i")
1504 emit(i)
1505 }
1506}
1507
1508fun main() = runBlocking<Unit> {
1509//sampleStart
1510 foo()
1511 .onEach { value ->
1512 check(value <= 1) { "Collected $value" }
1513 println(value)
1514 }
1515 .catch { e -> println("Caught $e") }
1516 .collect()
1517//sampleEnd
1518}
1519```
1520
1521</div>
1522
David.Watsonbb714c52019-08-30 17:49:42 +02001523> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-30.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001524
David.Watsonbb714c52019-08-30 17:49:42 +02001525Now we can see that a "Caught ..." message is printed and so we can catch all the exceptions without explicitly
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001526using a `try/catch` block:
1527
1528<!--- TEST EXCEPTION
1529Emitting 1
15301
1531Emitting 2
1532Caught java.lang.IllegalStateException: Collected 2
1533-->
1534
1535### Flow completion
1536
David.Watsonbb714c52019-08-30 17:49:42 +02001537When flow collection completes (normally or exceptionally) it may need to execute an action.
1538As you may have already noticed, it can be done in two ways: imperative or declarative.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001539
1540#### Imperative finally block
1541
David.Watsonbb714c52019-08-30 17:49:42 +02001542In addition to `try`/`catch`, a collector can also use a `finally` block to execute an action
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001543upon `collect` completion.
1544
1545<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1546
1547```kotlin
1548import kotlinx.coroutines.*
1549import kotlinx.coroutines.flow.*
1550
1551//sampleStart
1552fun foo(): Flow<Int> = (1..3).asFlow()
1553
1554fun main() = runBlocking<Unit> {
1555 try {
1556 foo().collect { value -> println(value) }
1557 } finally {
1558 println("Done")
1559 }
1560}
1561//sampleEnd
1562```
1563
1564</div>
1565
David.Watsonbb714c52019-08-30 17:49:42 +02001566> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-31.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001567
David.Watsonbb714c52019-08-30 17:49:42 +02001568This code prints three numbers produced by the `foo()` flow followed by a "Done" string:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001569
1570```text
15711
15722
15733
1574Done
1575```
1576
1577<!--- TEST -->
1578
1579#### Declarative handling
1580
David.Watsonbb714c52019-08-30 17:49:42 +02001581For the declarative approach, flow has [onCompletion] intermediate operator that is invoked
1582when the flow has completely collected.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001583
David.Watsonbb714c52019-08-30 17:49:42 +02001584The previous example can be rewritten using an [onCompletion] operator and produces the same output:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001585
1586<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1587
1588```kotlin
1589import kotlinx.coroutines.*
1590import kotlinx.coroutines.flow.*
1591
1592fun foo(): Flow<Int> = (1..3).asFlow()
1593
1594fun main() = runBlocking<Unit> {
1595//sampleStart
1596 foo()
1597 .onCompletion { println("Done") }
1598 .collect { value -> println(value) }
1599//sampleEnd
1600}
1601```
1602</div>
1603
David.Watsonbb714c52019-08-30 17:49:42 +02001604> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-32.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001605
1606<!--- TEST
16071
16082
16093
1610Done
1611-->
1612
1613The key advantage of [onCompletion] is a nullable `Throwable` parameter of the lambda that can be used
David.Watsonbb714c52019-08-30 17:49:42 +02001614to determine whether the flow collection was completed normally or exceptionally. In the following
1615example the `foo()` flow throws an exception after emitting the number 1:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001616
1617<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1618
1619```kotlin
1620import kotlinx.coroutines.*
1621import kotlinx.coroutines.flow.*
1622
1623//sampleStart
1624fun foo(): Flow<Int> = flow {
1625 emit(1)
1626 throw RuntimeException()
1627}
1628
1629fun main() = runBlocking<Unit> {
1630 foo()
1631 .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
1632 .catch { cause -> println("Caught exception") }
1633 .collect { value -> println(value) }
1634}
1635//sampleEnd
1636```
1637</div>
1638
David.Watsonbb714c52019-08-30 17:49:42 +02001639> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-33.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001640
1641As you may expect, it prints:
1642
1643```text
16441
1645Flow completed exceptionally
1646Caught exception
1647```
1648
1649<!--- TEST -->
1650
David.Watsonbb714c52019-08-30 17:49:42 +02001651The [onCompletion] operator, unlike [catch], does not handle the exception. As we can see from the above
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001652example code, the exception still flows downstream. It will be delivered to further `onCompletion` operators
David.Watsonbb714c52019-08-30 17:49:42 +02001653and can be handled with a `catch` operator.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001654
1655#### Upstream exceptions only
1656
David.Watsonbb714c52019-08-30 17:49:42 +02001657Just like the [catch] operator, [onCompletion] only sees exceptions coming from upstream and does not
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001658see downstream exceptions. For example, run the following code:
1659
1660<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1661
1662```kotlin
1663import kotlinx.coroutines.*
1664import kotlinx.coroutines.flow.*
1665
1666//sampleStart
1667fun foo(): Flow<Int> = (1..3).asFlow()
1668
1669fun main() = runBlocking<Unit> {
1670 foo()
1671 .onCompletion { cause -> println("Flow completed with $cause") }
1672 .collect { value ->
1673 check(value <= 1) { "Collected $value" }
1674 println(value)
1675 }
1676}
1677//sampleEnd
1678```
1679
1680</div>
1681
David.Watsonbb714c52019-08-30 17:49:42 +02001682> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-34.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001683
David.Watsonbb714c52019-08-30 17:49:42 +02001684We can see the completion cause is null, yet collection failed with exception:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001685
1686```text
16871
1688Flow completed with null
1689Exception in thread "main" java.lang.IllegalStateException: Collected 2
1690```
1691
1692<!--- TEST EXCEPTION -->
1693
1694### Imperative versus declarative
1695
David.Watsonbb714c52019-08-30 17:49:42 +02001696Now we know how to collect flow, and handle its completion and exceptions in both imperative and declarative ways.
1697The natural question here is, which approach is preferred and why?
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001698As a library, we do not advocate for any particular approach and believe that both options
1699are valid and should be selected according to your own preferences and code style.
1700
1701### Launching flow
1702
David.Watsonbb714c52019-08-30 17:49:42 +02001703It is easy to use flows to represent asynchronous events that are coming from some source.
1704In this case, we need an analogue of the `addEventListener` function that registers a piece of code with a reaction
1705for incoming events and continues further work. The [onEach] operator can serve this role.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001706However, `onEach` is an intermediate operator. We also need a terminal operator to collect the flow.
1707Otherwise, just calling `onEach` has no effect.
1708
David.Watsonbb714c52019-08-30 17:49:42 +02001709If we use the [collect] terminal operator after `onEach`, then the code after it will wait until the flow is collected:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001710
1711<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1712
1713```kotlin
1714import kotlinx.coroutines.*
1715import kotlinx.coroutines.flow.*
1716
1717//sampleStart
1718// Imitate a flow of events
1719fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
1720
1721fun main() = runBlocking<Unit> {
1722 events()
1723 .onEach { event -> println("Event: $event") }
1724 .collect() // <--- Collecting the flow waits
1725 println("Done")
1726}
1727//sampleEnd
1728```
1729
1730</div>
1731
David.Watsonbb714c52019-08-30 17:49:42 +02001732> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-35.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001733
1734As you can see, it prints:
1735
1736```text
1737Event: 1
1738Event: 2
1739Event: 3
1740Done
1741```
1742
1743<!--- TEST -->
1744
David.Watsonbb714c52019-08-30 17:49:42 +02001745The [launchIn] terminal operator comes in handy here. By replacing `collect` with `launchIn` we can
1746launch a collection of the flow in a separate coroutine, so that execution of further code
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001747immediately continues:
1748
1749<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1750
1751```kotlin
1752import kotlinx.coroutines.*
1753import kotlinx.coroutines.flow.*
1754
1755// Imitate a flow of events
1756fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
1757
1758//sampleStart
1759fun main() = runBlocking<Unit> {
1760 events()
1761 .onEach { event -> println("Event: $event") }
1762 .launchIn(this) // <--- Launching the flow in a separate coroutine
1763 println("Done")
1764}
1765//sampleEnd
1766```
1767
1768</div>
1769
David.Watsonbb714c52019-08-30 17:49:42 +02001770> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-36.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001771
1772It prints:
1773
1774```text
1775Done
1776Event: 1
1777Event: 2
1778Event: 3
1779```
1780
1781<!--- TEST -->
1782
1783The required parameter to `launchIn` must specify a [CoroutineScope] in which the coroutine to collect the flow is
David.Watsonbb714c52019-08-30 17:49:42 +02001784launched. In the above example this scope comes from the [runBlocking]
1785coroutine builder, so while the flow is running, this [runBlocking] scope waits for completion of its child coroutine
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001786and keeps the main function from returning and terminating this example.
1787
David.Watsonbb714c52019-08-30 17:49:42 +02001788In actual applications a scope will come from an entity with a limited
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001789lifetime. As soon as the lifetime of this entity is terminated the corresponding scope is cancelled, cancelling
David.Watsonbb714c52019-08-30 17:49:42 +02001790the collection of the corresponding flow. This way the pair of `onEach { ... }.launchIn(scope)` works
1791like the `addEventListener`. However, there is no need for the corresponding `removeEventListener` function,
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001792as cancellation and structured concurrency serve this purpose.
1793
David.Watsonbb714c52019-08-30 17:49:42 +02001794Note that [launchIn] also returns a [Job], which can be used to [cancel][Job.cancel] the corresponding flow collection
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001795coroutine only without cancelling the whole scope or to [join][Job.join] it.
1796
1797<!-- stdlib references -->
1798
1799[collections]: https://kotlinlang.org/docs/reference/collections-overview.html
1800[List]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/-list/index.html
1801[forEach]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/for-each.html
1802[Sequence]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/index.html
1803[Sequence.zip]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/zip.html
1804[Sequence.flatten]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/flatten.html
1805[Sequence.flatMap]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/flat-map.html
1806[exceptions]: https://kotlinlang.org/docs/reference/exceptions.html
1807
1808<!--- MODULE kotlinx-coroutines-core -->
1809<!--- INDEX kotlinx.coroutines -->
1810[delay]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/delay.html
1811[withTimeoutOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-timeout-or-null.html
1812[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
1813[Dispatchers.Main]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-main.html
1814[withContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-context.html
1815[CoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-dispatcher/index.html
1816[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html
1817[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
1818[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/index.html
1819[Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/cancel.html
1820[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html
1821<!--- INDEX kotlinx.coroutines.flow -->
1822[Flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html
1823[flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow.html
1824[FlowCollector.emit]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow-collector/emit.html
1825[collect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect.html
1826[flowOf]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-of.html
1827[map]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/map.html
1828[filter]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/filter.html
1829[transform]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/transform.html
1830[take]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/take.html
1831[toList]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-list.html
1832[toSet]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-set.html
1833[first]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/first.html
1834[single]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/single.html
1835[reduce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/reduce.html
1836[fold]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/fold.html
1837[flowOn]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-on.html
1838[buffer]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/buffer.html
1839[conflate]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/conflate.html
1840[collectLatest]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect-latest.html
1841[zip]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/zip.html
1842[combine]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/combine.html
1843[onEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-each.html
1844[flatMapConcat]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-concat.html
1845[flattenConcat]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-concat.html
1846[flatMapMerge]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-merge.html
1847[flattenMerge]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-merge.html
1848[DEFAULT_CONCURRENCY]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-d-e-f-a-u-l-t_-c-o-n-c-u-r-r-e-n-c-y.html
1849[flatMapLatest]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-latest.html
1850[catch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/catch.html
1851[onCompletion]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-completion.html
1852[launchIn]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/launch-in.html
1853<!--- END -->