blob: b1d6669a5d63c844929a6f606ca5dd3a91702149 [file] [log] [blame] [view]
Roman Elizarov660c2d72020-02-14 13:18:37 +03001<!--- TEST_NAME FlowGuideTest -->
Roman Elizarov3258e1f2019-08-22 20:08:48 +03002
3**Table of contents**
4
5<!--- TOC -->
6
7* [Asynchronous Flow](#asynchronous-flow)
8 * [Representing multiple values](#representing-multiple-values)
9 * [Sequences](#sequences)
10 * [Suspending functions](#suspending-functions)
11 * [Flows](#flows)
12 * [Flows are cold](#flows-are-cold)
Roman Elizarov9bbb6692020-06-03 12:02:17 +030013 * [Flow cancellation basics](#flow-cancellation-basics)
Roman Elizarov3258e1f2019-08-22 20:08:48 +030014 * [Flow builders](#flow-builders)
15 * [Intermediate flow operators](#intermediate-flow-operators)
16 * [Transform operator](#transform-operator)
17 * [Size-limiting operators](#size-limiting-operators)
18 * [Terminal flow operators](#terminal-flow-operators)
19 * [Flows are sequential](#flows-are-sequential)
20 * [Flow context](#flow-context)
21 * [Wrong emission withContext](#wrong-emission-withcontext)
22 * [flowOn operator](#flowon-operator)
23 * [Buffering](#buffering)
24 * [Conflation](#conflation)
25 * [Processing the latest value](#processing-the-latest-value)
26 * [Composing multiple flows](#composing-multiple-flows)
27 * [Zip](#zip)
28 * [Combine](#combine)
29 * [Flattening flows](#flattening-flows)
30 * [flatMapConcat](#flatmapconcat)
31 * [flatMapMerge](#flatmapmerge)
32 * [flatMapLatest](#flatmaplatest)
33 * [Flow exceptions](#flow-exceptions)
34 * [Collector try and catch](#collector-try-and-catch)
35 * [Everything is caught](#everything-is-caught)
36 * [Exception transparency](#exception-transparency)
37 * [Transparent catch](#transparent-catch)
38 * [Catching declaratively](#catching-declaratively)
39 * [Flow completion](#flow-completion)
40 * [Imperative finally block](#imperative-finally-block)
41 * [Declarative handling](#declarative-handling)
Roman Elizaroveb4e7d32020-04-27 15:04:00 +030042 * [Successful completion](#successful-completion)
Roman Elizarov3258e1f2019-08-22 20:08:48 +030043 * [Imperative versus declarative](#imperative-versus-declarative)
44 * [Launching flow](#launching-flow)
Roman Elizarov9bbb6692020-06-03 12:02:17 +030045 * [Flow cancellation checks](#flow-cancellation-checks)
46 * [Making busy flow cancellable](#making-busy-flow-cancellable)
Vsevolod Tolstopyatovc99704a2019-09-24 19:30:49 +030047 * [Flow and Reactive Streams](#flow-and-reactive-streams)
Roman Elizarov3258e1f2019-08-22 20:08:48 +030048
Roman Elizarov660c2d72020-02-14 13:18:37 +030049<!--- END -->
Roman Elizarov3258e1f2019-08-22 20:08:48 +030050
51## Asynchronous Flow
52
Masood Fallahpoor807d6282020-08-14 13:47:55 +043053A suspending function asynchronously returns a single value, but how can we return
David.Watsonbb714c52019-08-30 17:49:42 +020054multiple asynchronously computed values? This is where Kotlin Flows come in.
Roman Elizarov3258e1f2019-08-22 20:08:48 +030055
56### Representing multiple values
57
58Multiple values can be represented in Kotlin using [collections].
Roman Elizarovccdc5632020-07-16 18:30:32 +030059For example, we can have a `simple` function that returns a [List]
David.Watsonbb714c52019-08-30 17:49:42 +020060of three numbers and then print them all using [forEach]:
Roman Elizarov3258e1f2019-08-22 20:08:48 +030061
62<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
63
64```kotlin
Roman Elizarovccdc5632020-07-16 18:30:32 +030065fun simple(): List<Int> = listOf(1, 2, 3)
Roman Elizarov3258e1f2019-08-22 20:08:48 +030066
67fun main() {
Roman Elizarovccdc5632020-07-16 18:30:32 +030068 simple().forEach { value -> println(value) }
Roman Elizarov3258e1f2019-08-22 20:08:48 +030069}
70```
71
72</div>
73
David.Watsonbb714c52019-08-30 17:49:42 +020074> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-01.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +030075
76This code outputs:
77
78```text
791
802
813
82```
83
84<!--- TEST -->
85
86#### Sequences
87
David.Watsonbb714c52019-08-30 17:49:42 +020088If we are computing the numbers with some CPU-consuming blocking code
89(each computation taking 100ms), then we can represent the numbers using a [Sequence]:
Roman Elizarov3258e1f2019-08-22 20:08:48 +030090
91<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
92
93```kotlin
Roman Elizarovccdc5632020-07-16 18:30:32 +030094fun simple(): Sequence<Int> = sequence { // sequence builder
Roman Elizarov3258e1f2019-08-22 20:08:48 +030095 for (i in 1..3) {
96 Thread.sleep(100) // pretend we are computing it
97 yield(i) // yield next value
98 }
99}
100
101fun main() {
Roman Elizarovccdc5632020-07-16 18:30:32 +0300102 simple().forEach { value -> println(value) }
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300103}
104```
105
106</div>
107
David.Watsonbb714c52019-08-30 17:49:42 +0200108> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-02.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300109
110This code outputs the same numbers, but it waits 100ms before printing each one.
111
112<!--- TEST
1131
1142
1153
116-->
117
118#### Suspending functions
119
120However, this computation blocks the main thread that is running the code.
Roman Elizarovccdc5632020-07-16 18:30:32 +0300121When these values are computed by asynchronous code we can mark the `simple` function with a `suspend` modifier,
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300122so that it can perform its work without blocking and return the result as a list:
123
124<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
125
126```kotlin
127import kotlinx.coroutines.*
128
129//sampleStart
Roman Elizarovccdc5632020-07-16 18:30:32 +0300130suspend fun simple(): List<Int> {
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300131 delay(1000) // pretend we are doing something asynchronous here
132 return listOf(1, 2, 3)
133}
134
135fun main() = runBlocking<Unit> {
Roman Elizarovccdc5632020-07-16 18:30:32 +0300136 simple().forEach { value -> println(value) }
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300137}
138//sampleEnd
139```
140
141</div>
142
David.Watsonbb714c52019-08-30 17:49:42 +0200143> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-03.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300144
145This code prints the numbers after waiting for a second.
146
147<!--- TEST
1481
1492
1503
151-->
152
153#### Flows
154
David.Watsonbb714c52019-08-30 17:49:42 +0200155Using the `List<Int>` result type, means we can only return all the values at once. To represent
Shubhama61c6cc2020-08-15 21:21:49 +0900156the stream of values that are being asynchronously computed, we can use a [`Flow<Int>`][Flow] type just like we would use the `Sequence<Int>` type for synchronously computed values:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300157
158<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
159
160```kotlin
161import kotlinx.coroutines.*
162import kotlinx.coroutines.flow.*
163
164//sampleStart
Roman Elizarovccdc5632020-07-16 18:30:32 +0300165fun simple(): Flow<Int> = flow { // flow builder
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300166 for (i in 1..3) {
167 delay(100) // pretend we are doing something useful here
168 emit(i) // emit next value
169 }
170}
171
172fun main() = runBlocking<Unit> {
David.Watsonbb714c52019-08-30 17:49:42 +0200173 // Launch a concurrent coroutine to check if the main thread is blocked
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300174 launch {
175 for (k in 1..3) {
176 println("I'm not blocked $k")
177 delay(100)
178 }
179 }
180 // Collect the flow
Roman Elizarovccdc5632020-07-16 18:30:32 +0300181 simple().collect { value -> println(value) }
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300182}
183//sampleEnd
184```
185
186</div>
187
David.Watsonbb714c52019-08-30 17:49:42 +0200188> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-04.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300189
190This code waits 100ms before printing each number without blocking the main thread. This is verified
191by printing "I'm not blocked" every 100ms from a separate coroutine that is running in the main thread:
192
193```text
194I'm not blocked 1
1951
196I'm not blocked 2
1972
198I'm not blocked 3
1993
200```
201
202<!--- TEST -->
203
David.Watsonbb714c52019-08-30 17:49:42 +0200204Notice the following differences in the code with the [Flow] from the earlier examples:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300205
206* A builder function for [Flow] type is called [flow].
207* Code inside the `flow { ... }` builder block can suspend.
Roman Elizarovccdc5632020-07-16 18:30:32 +0300208* The `simple` function is no longer marked with `suspend` modifier.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300209* Values are _emitted_ from the flow using [emit][FlowCollector.emit] function.
210* Values are _collected_ from the flow using [collect][collect] function.
211
Roman Elizarovccdc5632020-07-16 18:30:32 +0300212> We can replace [delay] with `Thread.sleep` in the body of `simple`'s `flow { ... }` and see that the main
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300213thread is blocked in this case.
214
215### Flows are cold
216
David.Watsonbb714c52019-08-30 17:49:42 +0200217Flows are _cold_ streams similar to sequences &mdash; the code inside a [flow] builder does not
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300218run until the flow is collected. This becomes clear in the following example:
219
220<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
221
222```kotlin
223import kotlinx.coroutines.*
224import kotlinx.coroutines.flow.*
225
226//sampleStart
Roman Elizarovccdc5632020-07-16 18:30:32 +0300227fun simple(): Flow<Int> = flow {
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300228 println("Flow started")
229 for (i in 1..3) {
230 delay(100)
231 emit(i)
232 }
233}
234
235fun main() = runBlocking<Unit> {
Roman Elizarovccdc5632020-07-16 18:30:32 +0300236 println("Calling simple function...")
237 val flow = simple()
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300238 println("Calling collect...")
239 flow.collect { value -> println(value) }
240 println("Calling collect again...")
241 flow.collect { value -> println(value) }
242}
243//sampleEnd
244```
245
246</div>
247
David.Watsonbb714c52019-08-30 17:49:42 +0200248> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-05.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300249
250Which prints:
251
252```text
Roman Elizarovccdc5632020-07-16 18:30:32 +0300253Calling simple function...
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300254Calling collect...
255Flow started
2561
2572
2583
259Calling collect again...
260Flow started
2611
2622
2633
264```
265
266<!--- TEST -->
267
Roman Elizarovccdc5632020-07-16 18:30:32 +0300268This is a key reason the `simple` function (which returns a flow) is not marked with `suspend` modifier.
269By itself, `simple()` call returns quickly and does not wait for anything. The flow starts every time it is collected,
David.Watsonbb714c52019-08-30 17:49:42 +0200270that is why we see "Flow started" when we call `collect` again.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300271
Roman Elizarov9bbb6692020-06-03 12:02:17 +0300272### Flow cancellation basics
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300273
Roman Elizarov9bbb6692020-06-03 12:02:17 +0300274Flow adheres to the general cooperative cancellation of coroutines. As usual, flow collection can be
275cancelled when the flow is suspended in a cancellable suspending function (like [delay]).
David.Watsonbb714c52019-08-30 17:49:42 +0200276The following example shows how the flow gets cancelled on a timeout when running in a [withTimeoutOrNull] block
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300277and stops executing its code:
278
279<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
280
281```kotlin
282import kotlinx.coroutines.*
283import kotlinx.coroutines.flow.*
284
285//sampleStart
Roman Elizarovccdc5632020-07-16 18:30:32 +0300286fun simple(): Flow<Int> = flow {
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300287 for (i in 1..3) {
288 delay(100)
289 println("Emitting $i")
290 emit(i)
291 }
292}
293
294fun main() = runBlocking<Unit> {
295 withTimeoutOrNull(250) { // Timeout after 250ms
Roman Elizarovccdc5632020-07-16 18:30:32 +0300296 simple().collect { value -> println(value) }
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300297 }
298 println("Done")
299}
300//sampleEnd
301```
302
303</div>
304
David.Watsonbb714c52019-08-30 17:49:42 +0200305> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-06.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300306
Roman Elizarovccdc5632020-07-16 18:30:32 +0300307Notice how only two numbers get emitted by the flow in the `simple` function, producing the following output:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300308
309```text
310Emitting 1
3111
312Emitting 2
3132
314Done
315```
316
317<!--- TEST -->
318
Roman Elizarov9bbb6692020-06-03 12:02:17 +0300319See [Flow cancellation checks](#flow-cancellation-checks) section for more details.
320
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300321### Flow builders
322
323The `flow { ... }` builder from the previous examples is the most basic one. There are other builders for
David.Watsonbb714c52019-08-30 17:49:42 +0200324easier declaration of flows:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300325
326* [flowOf] builder that defines a flow emitting a fixed set of values.
327* Various collections and sequences can be converted to flows using `.asFlow()` extension functions.
328
David.Watsonbb714c52019-08-30 17:49:42 +0200329So, the example that prints the numbers from 1 to 3 from a flow can be written as:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300330
331<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
332
333```kotlin
334import kotlinx.coroutines.*
335import kotlinx.coroutines.flow.*
336
337fun main() = runBlocking<Unit> {
338//sampleStart
339 // Convert an integer range to a flow
340 (1..3).asFlow().collect { value -> println(value) }
341//sampleEnd
342}
343```
344
345</div>
346
David.Watsonbb714c52019-08-30 17:49:42 +0200347> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-07.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300348
349<!--- TEST
3501
3512
3523
353-->
354
355### Intermediate flow operators
356
David.Watsonbb714c52019-08-30 17:49:42 +0200357Flows can be transformed with operators, just as you would with collections and sequences.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300358Intermediate operators are applied to an upstream flow and return a downstream flow.
359These operators are cold, just like flows are. A call to such an operator is not
360a suspending function itself. It works quickly, returning the definition of a new transformed flow.
361
362The basic operators have familiar names like [map] and [filter].
David.Watsonbb714c52019-08-30 17:49:42 +0200363The important difference to sequences is that blocks of
364code inside these operators can call suspending functions.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300365
366For example, a flow of incoming requests can be
David.Watsonbb714c52019-08-30 17:49:42 +0200367mapped to the results with the [map] operator, even when performing a request is a long-running
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300368operation that is implemented by a suspending function:
369
370<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
371
372```kotlin
373import kotlinx.coroutines.*
374import kotlinx.coroutines.flow.*
375
376//sampleStart
377suspend fun performRequest(request: Int): String {
378 delay(1000) // imitate long-running asynchronous work
379 return "response $request"
380}
381
382fun main() = runBlocking<Unit> {
383 (1..3).asFlow() // a flow of requests
384 .map { request -> performRequest(request) }
385 .collect { response -> println(response) }
386}
387//sampleEnd
388```
389
390</div>
391
David.Watsonbb714c52019-08-30 17:49:42 +0200392> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-08.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300393
David.Watsonbb714c52019-08-30 17:49:42 +0200394It produces the following three lines, each line appearing after each second:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300395
396```text
397response 1
398response 2
399response 3
400```
401
402<!--- TEST -->
403
404#### Transform operator
405
406Among the flow transformation operators, the most general one is called [transform]. It can be used to imitate
David.Watsonbb714c52019-08-30 17:49:42 +0200407simple transformations like [map] and [filter], as well as implement more complex transformations.
408Using the `transform` operator, we can [emit][FlowCollector.emit] arbitrary values an arbitrary number of times.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300409
410For example, using `transform` we can emit a string before performing a long-running asynchronous request
411and follow it with a response:
412
413<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
414
415```kotlin
416import kotlinx.coroutines.*
417import kotlinx.coroutines.flow.*
418
419suspend fun performRequest(request: Int): String {
420 delay(1000) // imitate long-running asynchronous work
421 return "response $request"
422}
423
424fun main() = runBlocking<Unit> {
425//sampleStart
426 (1..3).asFlow() // a flow of requests
427 .transform { request ->
428 emit("Making request $request")
429 emit(performRequest(request))
430 }
431 .collect { response -> println(response) }
432//sampleEnd
433}
434```
435
436</div>
437
David.Watsonbb714c52019-08-30 17:49:42 +0200438> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-09.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300439
440The output of this code is:
441
442```text
443Making request 1
444response 1
445Making request 2
446response 2
447Making request 3
448response 3
449```
450
451<!--- TEST -->
452
453#### Size-limiting operators
454
455Size-limiting intermediate operators like [take] cancel the execution of the flow when the corresponding limit
David.Watsonbb714c52019-08-30 17:49:42 +0200456is reached. Cancellation in coroutines is always performed by throwing an exception, so that all the resource-management
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300457functions (like `try { ... } finally { ... }` blocks) operate normally in case of cancellation:
458
459<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
460
461```kotlin
462import kotlinx.coroutines.*
463import kotlinx.coroutines.flow.*
464
465//sampleStart
466fun numbers(): Flow<Int> = flow {
467 try {
468 emit(1)
469 emit(2)
470 println("This line will not execute")
471 emit(3)
472 } finally {
473 println("Finally in numbers")
474 }
475}
476
477fun main() = runBlocking<Unit> {
478 numbers()
479 .take(2) // take only the first two
480 .collect { value -> println(value) }
481}
482//sampleEnd
483```
484
485</div>
486
David.Watsonbb714c52019-08-30 17:49:42 +0200487> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-10.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300488
David.Watsonbb714c52019-08-30 17:49:42 +0200489The output of this code clearly shows that the execution of the `flow { ... }` body in the `numbers()` function
490stopped after emitting the second number:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300491
492```text
4931
4942
495Finally in numbers
496```
497
498<!--- TEST -->
499
500### Terminal flow operators
501
502Terminal operators on flows are _suspending functions_ that start a collection of the flow.
David.Watsonbb714c52019-08-30 17:49:42 +0200503The [collect] operator is the most basic one, but there are other terminal operators, which can make it easier:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300504
505* Conversion to various collections like [toList] and [toSet].
506* Operators to get the [first] value and to ensure that a flow emits a [single] value.
507* Reducing a flow to a value with [reduce] and [fold].
508
509For example:
510
511<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
512
513```kotlin
514import kotlinx.coroutines.*
515import kotlinx.coroutines.flow.*
516
517fun main() = runBlocking<Unit> {
518//sampleStart
519 val sum = (1..5).asFlow()
520 .map { it * it } // squares of numbers from 1 to 5
521 .reduce { a, b -> a + b } // sum them (terminal operator)
522 println(sum)
523//sampleEnd
524}
525```
526
527</div>
528
David.Watsonbb714c52019-08-30 17:49:42 +0200529> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-11.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300530
531Prints a single number:
532
533```text
53455
535```
536
537<!--- TEST -->
538
539### Flows are sequential
540
541Each individual collection of a flow is performed sequentially unless special operators that operate
542on multiple flows are used. The collection works directly in the coroutine that calls a terminal operator.
543No new coroutines are launched by default.
David.Watsonbb714c52019-08-30 17:49:42 +0200544Each emitted value is processed by all the intermediate operators from
545upstream to downstream and is then delivered to the terminal operator after.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300546
David.Watsonbb714c52019-08-30 17:49:42 +0200547See the following example that filters the even integers and maps them to strings:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300548
549<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
550
551```kotlin
552import kotlinx.coroutines.*
553import kotlinx.coroutines.flow.*
554
555fun main() = runBlocking<Unit> {
556//sampleStart
557 (1..5).asFlow()
558 .filter {
559 println("Filter $it")
560 it % 2 == 0
561 }
562 .map {
563 println("Map $it")
564 "string $it"
565 }.collect {
566 println("Collect $it")
567 }
568//sampleEnd
569}
570```
571
572</div>
573
David.Watsonbb714c52019-08-30 17:49:42 +0200574> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-12.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300575
576Producing:
577
578```text
579Filter 1
580Filter 2
581Map 2
582Collect string 2
583Filter 3
584Filter 4
585Map 4
586Collect string 4
587Filter 5
588```
589
590<!--- TEST -->
591
592### Flow context
593
594Collection of a flow always happens in the context of the calling coroutine. For example, if there is
Roman Elizarovccdc5632020-07-16 18:30:32 +0300595a `simple` flow, then the following code runs in the context specified
596by the author of this code, regardless of the implementation details of the `simple` flow:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300597
598<div class="sample" markdown="1" theme="idea" data-highlight-only>
599
600```kotlin
601withContext(context) {
Roman Elizarovccdc5632020-07-16 18:30:32 +0300602 simple().collect { value ->
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300603 println(value) // run in the specified context
604 }
605}
606```
607
608</div>
609
610<!--- CLEAR -->
611
612This property of a flow is called _context preservation_.
613
614So, by default, code in the `flow { ... }` builder runs in the context that is provided by a collector
Roman Elizarovccdc5632020-07-16 18:30:32 +0300615of the corresponding flow. For example, consider the implementation of a `simple` function that prints the thread
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300616it is called on and emits three numbers:
617
618<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
619
620```kotlin
621import kotlinx.coroutines.*
622import kotlinx.coroutines.flow.*
623
624fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
625
626//sampleStart
Roman Elizarovccdc5632020-07-16 18:30:32 +0300627fun simple(): Flow<Int> = flow {
628 log("Started simple flow")
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300629 for (i in 1..3) {
630 emit(i)
631 }
632}
633
634fun main() = runBlocking<Unit> {
Roman Elizarovccdc5632020-07-16 18:30:32 +0300635 simple().collect { value -> log("Collected $value") }
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300636}
637//sampleEnd
638```
639
640</div>
641
David.Watsonbb714c52019-08-30 17:49:42 +0200642> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-13.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300643
644Running this code produces:
645
646```text
Roman Elizarovccdc5632020-07-16 18:30:32 +0300647[main @coroutine#1] Started simple flow
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300648[main @coroutine#1] Collected 1
649[main @coroutine#1] Collected 2
650[main @coroutine#1] Collected 3
651```
652
653<!--- TEST FLEXIBLE_THREAD -->
654
Roman Elizarovccdc5632020-07-16 18:30:32 +0300655Since `simple().collect` is called from the main thread, the body of `simple`'s flow is also called in the main thread.
David.Watsonbb714c52019-08-30 17:49:42 +0200656This is the perfect default for fast-running or asynchronous code that does not care about the execution context and
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300657does not block the caller.
658
659#### Wrong emission withContext
660
661However, the long-running CPU-consuming code might need to be executed in the context of [Dispatchers.Default] and UI-updating
662code might need to be executed in the context of [Dispatchers.Main]. Usually, [withContext] is used
David.Watsonbb714c52019-08-30 17:49:42 +0200663to change the context in the code using Kotlin coroutines, but code in the `flow { ... }` builder has to honor the context
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300664preservation property and is not allowed to [emit][FlowCollector.emit] from a different context.
665
666Try running the following code:
667
668<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
669
670```kotlin
671import kotlinx.coroutines.*
672import kotlinx.coroutines.flow.*
673
674//sampleStart
Roman Elizarovccdc5632020-07-16 18:30:32 +0300675fun simple(): Flow<Int> = flow {
David.Watsonbb714c52019-08-30 17:49:42 +0200676 // The WRONG way to change context for CPU-consuming code in flow builder
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300677 kotlinx.coroutines.withContext(Dispatchers.Default) {
678 for (i in 1..3) {
679 Thread.sleep(100) // pretend we are computing it in CPU-consuming way
680 emit(i) // emit next value
681 }
682 }
683}
684
685fun main() = runBlocking<Unit> {
Roman Elizarovccdc5632020-07-16 18:30:32 +0300686 simple().collect { value -> println(value) }
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300687}
688//sampleEnd
689```
690
691</div>
692
David.Watsonbb714c52019-08-30 17:49:42 +0200693> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-14.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300694
695This code produces the following exception:
696
Vsevolod Tolstopyatov83943ef2019-10-22 19:26:50 +0300697```text
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300698Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
699 Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
Vsevolod Tolstopyatovad542c42020-06-17 04:43:28 -0700700 but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300701 Please refer to 'flow' documentation or use 'flowOn' instead
702 at ...
Vsevolod Tolstopyatov83943ef2019-10-22 19:26:50 +0300703```
704
705<!--- TEST EXCEPTION -->
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300706
707#### flowOn operator
708
David.Watsonbb714c52019-08-30 17:49:42 +0200709The exception refers to the [flowOn] function that shall be used to change the context of the flow emission.
710The correct way to change the context of a flow is shown in the example below, which also prints the
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300711names of the corresponding threads to show how it all works:
712
713<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
714
715```kotlin
716import kotlinx.coroutines.*
717import kotlinx.coroutines.flow.*
718
719fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
720
721//sampleStart
Roman Elizarovccdc5632020-07-16 18:30:32 +0300722fun simple(): Flow<Int> = flow {
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300723 for (i in 1..3) {
724 Thread.sleep(100) // pretend we are computing it in CPU-consuming way
725 log("Emitting $i")
726 emit(i) // emit next value
727 }
728}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
729
730fun main() = runBlocking<Unit> {
Roman Elizarovccdc5632020-07-16 18:30:32 +0300731 simple().collect { value ->
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300732 log("Collected $value")
733 }
734}
735//sampleEnd
736```
737
738</div>
739
David.Watsonbb714c52019-08-30 17:49:42 +0200740> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-15.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300741
742Notice how `flow { ... }` works in the background thread, while collection happens in the main thread:
743
744<!--- TEST FLEXIBLE_THREAD
745[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
746[main @coroutine#1] Collected 1
747[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
748[main @coroutine#1] Collected 2
749[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
750[main @coroutine#1] Collected 3
751-->
752
David.Watsonbb714c52019-08-30 17:49:42 +0200753Another thing to observe here is that the [flowOn] operator has changed the default sequential nature of the flow.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300754Now collection happens in one coroutine ("coroutine#1") and emission happens in another coroutine
David.Watsonbb714c52019-08-30 17:49:42 +0200755("coroutine#2") that is running in another thread concurrently with the collecting coroutine. The [flowOn] operator
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300756creates another coroutine for an upstream flow when it has to change the [CoroutineDispatcher] in its context.
757
758### Buffering
759
David.Watsonbb714c52019-08-30 17:49:42 +0200760Running different parts of a flow in different coroutines can be helpful from the standpoint of the overall time it takes
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300761to collect the flow, especially when long-running asynchronous operations are involved. For example, consider a case when
Roman Elizarovccdc5632020-07-16 18:30:32 +0300762the emission by a `simple` flow is slow, taking 100 ms to produce an element; and collector is also slow,
David.Watsonbb714c52019-08-30 17:49:42 +0200763taking 300 ms to process an element. Let's see how long it takes to collect such a flow with three numbers:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300764
765<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
766
767```kotlin
768import kotlinx.coroutines.*
769import kotlinx.coroutines.flow.*
770import kotlin.system.*
771
772//sampleStart
Roman Elizarovccdc5632020-07-16 18:30:32 +0300773fun simple(): Flow<Int> = flow {
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300774 for (i in 1..3) {
775 delay(100) // pretend we are asynchronously waiting 100 ms
776 emit(i) // emit next value
777 }
778}
779
780fun main() = runBlocking<Unit> {
781 val time = measureTimeMillis {
Roman Elizarovccdc5632020-07-16 18:30:32 +0300782 simple().collect { value ->
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300783 delay(300) // pretend we are processing it for 300 ms
784 println(value)
785 }
786 }
787 println("Collected in $time ms")
788}
789//sampleEnd
790```
791
792</div>
793
David.Watsonbb714c52019-08-30 17:49:42 +0200794> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-16.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300795
David.Watsonbb714c52019-08-30 17:49:42 +0200796It produces something like this, with the whole collection taking around 1200 ms (three numbers, 400 ms for each):
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300797
798```text
7991
8002
8013
802Collected in 1220 ms
803```
804
805<!--- TEST ARBITRARY_TIME -->
806
Roman Elizarovccdc5632020-07-16 18:30:32 +0300807We can use a [buffer] operator on a flow to run emitting code of the `simple` flow concurrently with collecting code,
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300808as opposed to running them sequentially:
809
810<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
811
812```kotlin
813import kotlinx.coroutines.*
814import kotlinx.coroutines.flow.*
815import kotlin.system.*
816
Roman Elizarovccdc5632020-07-16 18:30:32 +0300817fun simple(): Flow<Int> = flow {
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300818 for (i in 1..3) {
819 delay(100) // pretend we are asynchronously waiting 100 ms
820 emit(i) // emit next value
821 }
822}
823
824fun main() = runBlocking<Unit> {
825//sampleStart
826 val time = measureTimeMillis {
Roman Elizarovccdc5632020-07-16 18:30:32 +0300827 simple()
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300828 .buffer() // buffer emissions, don't wait
829 .collect { value ->
830 delay(300) // pretend we are processing it for 300 ms
831 println(value)
832 }
833 }
834 println("Collected in $time ms")
835//sampleEnd
836}
837```
838
839</div>
840
David.Watsonbb714c52019-08-30 17:49:42 +0200841> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-17.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300842
David.Watsonbb714c52019-08-30 17:49:42 +0200843It produces the same numbers just faster, as we have effectively created a processing pipeline,
844having to only wait 100 ms for the first number and then spending only 300 ms to process
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300845each number. This way it takes around 1000 ms to run:
846
847```text
8481
8492
8503
851Collected in 1071 ms
852```
853
854<!--- TEST ARBITRARY_TIME -->
855
David.Watsonbb714c52019-08-30 17:49:42 +0200856> Note that the [flowOn] operator uses the same buffering mechanism when it has to change a [CoroutineDispatcher],
857but here we explicitly request buffering without changing the execution context.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300858
859#### Conflation
860
David.Watsonbb714c52019-08-30 17:49:42 +0200861When a flow represents partial results of the operation or operation status updates, it may not be necessary
862to process each value, but instead, only most recent ones. In this case, the [conflate] operator can be used to skip
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300863intermediate values when a collector is too slow to process them. Building on the previous example:
864
865<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
866
867```kotlin
868import kotlinx.coroutines.*
869import kotlinx.coroutines.flow.*
870import kotlin.system.*
871
Roman Elizarovccdc5632020-07-16 18:30:32 +0300872fun simple(): Flow<Int> = flow {
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300873 for (i in 1..3) {
874 delay(100) // pretend we are asynchronously waiting 100 ms
875 emit(i) // emit next value
876 }
877}
878
879fun main() = runBlocking<Unit> {
880//sampleStart
881 val time = measureTimeMillis {
Roman Elizarovccdc5632020-07-16 18:30:32 +0300882 simple()
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300883 .conflate() // conflate emissions, don't process each one
884 .collect { value ->
885 delay(300) // pretend we are processing it for 300 ms
886 println(value)
887 }
888 }
889 println("Collected in $time ms")
890//sampleEnd
891}
892```
893
894</div>
895
David.Watsonbb714c52019-08-30 17:49:42 +0200896> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-18.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300897
David.Watsonbb714c52019-08-30 17:49:42 +0200898We see that while the first number was still being processed the second, and third were already produced, so
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300899the second one was _conflated_ and only the most recent (the third one) was delivered to the collector:
900
901```text
9021
9033
904Collected in 758 ms
905```
906
907<!--- TEST ARBITRARY_TIME -->
908
909#### Processing the latest value
910
David.Watsonbb714c52019-08-30 17:49:42 +0200911Conflation is one way to speed up processing when both the emitter and collector are slow. It does it by dropping emitted values.
912The other way is to cancel a slow collector and restart it every time a new value is emitted. There is
913a family of `xxxLatest` operators that perform the same essential logic of a `xxx` operator, but cancel the
914code in their block on a new value. Let's try changing [conflate] to [collectLatest] in the previous example:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300915
916<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
917
918```kotlin
919import kotlinx.coroutines.*
920import kotlinx.coroutines.flow.*
921import kotlin.system.*
922
Roman Elizarovccdc5632020-07-16 18:30:32 +0300923fun simple(): Flow<Int> = flow {
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300924 for (i in 1..3) {
925 delay(100) // pretend we are asynchronously waiting 100 ms
926 emit(i) // emit next value
927 }
928}
929
930fun main() = runBlocking<Unit> {
931//sampleStart
932 val time = measureTimeMillis {
Roman Elizarovccdc5632020-07-16 18:30:32 +0300933 simple()
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300934 .collectLatest { value -> // cancel & restart on the latest value
935 println("Collecting $value")
936 delay(300) // pretend we are processing it for 300 ms
937 println("Done $value")
938 }
939 }
940 println("Collected in $time ms")
941//sampleEnd
942}
943```
944
945</div>
946
David.Watsonbb714c52019-08-30 17:49:42 +0200947> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-19.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300948
949Since the body of [collectLatest] takes 300 ms, but new values are emitted every 100 ms, we see that the block
950is run on every value, but completes only for the last value:
951
952```text
953Collecting 1
954Collecting 2
955Collecting 3
956Done 3
957Collected in 741 ms
958```
959
960<!--- TEST ARBITRARY_TIME -->
961
962### Composing multiple flows
963
David.Watsonbb714c52019-08-30 17:49:42 +0200964There are lots of ways to compose multiple flows.
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300965
966#### Zip
967
David.Watsonbb714c52019-08-30 17:49:42 +0200968Just like the [Sequence.zip] extension function in the Kotlin standard library,
969flows have a [zip] operator that combines the corresponding values of two flows:
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300970
971<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
972
973```kotlin
974import kotlinx.coroutines.*
975import kotlinx.coroutines.flow.*
976
977fun main() = runBlocking<Unit> {
978//sampleStart
979 val nums = (1..3).asFlow() // numbers 1..3
980 val strs = flowOf("one", "two", "three") // strings
981 nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
982 .collect { println(it) } // collect and print
983//sampleEnd
984}
985```
986
987</div>
988
David.Watsonbb714c52019-08-30 17:49:42 +0200989> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-20.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +0300990
991This example prints:
992
993```text
9941 -> one
9952 -> two
9963 -> three
997```
998
999<!--- TEST -->
1000
1001#### Combine
1002
David.Watsonbb714c52019-08-30 17:49:42 +02001003When flow represents the most recent value of a variable or operation (see also the related
1004section on [conflation](#conflation)), it might be needed to perform a computation that depends on
1005the most recent values of the corresponding flows and to recompute it whenever any of the upstream
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001006flows emit a value. The corresponding family of operators is called [combine].
1007
1008For example, if the numbers in the previous example update every 300ms, but strings update every 400 ms,
David.Watsonbb714c52019-08-30 17:49:42 +02001009then zipping them using the [zip] operator will still produce the same result,
1010albeit results that are printed every 400 ms:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001011
David.Watsonbb714c52019-08-30 17:49:42 +02001012> We use a [onEach] intermediate operator in this example to delay each element and make the code
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001013that emits sample flows more declarative and shorter.
1014
1015<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1016
1017```kotlin
1018import kotlinx.coroutines.*
1019import kotlinx.coroutines.flow.*
1020
1021fun main() = runBlocking<Unit> {
1022//sampleStart
1023 val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
1024 val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
1025 val startTime = System.currentTimeMillis() // remember the start time
1026 nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
1027 .collect { value -> // collect and print
1028 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1029 }
1030//sampleEnd
1031}
1032```
1033
1034</div>
1035
David.Watsonbb714c52019-08-30 17:49:42 +02001036> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-21.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001037
1038<!--- TEST ARBITRARY_TIME
10391 -> one at 437 ms from start
10402 -> two at 837 ms from start
10413 -> three at 1243 ms from start
1042-->
1043
David.Watsonbb714c52019-08-30 17:49:42 +02001044However, when using a [combine] operator here instead of a [zip]:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001045
1046<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1047
1048```kotlin
1049import kotlinx.coroutines.*
1050import kotlinx.coroutines.flow.*
1051
1052fun main() = runBlocking<Unit> {
1053//sampleStart
1054 val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
1055 val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
Roman Elizarova73862f2019-09-02 17:31:14 +03001056 val startTime = System.currentTimeMillis() // remember the start time
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001057 nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
1058 .collect { value -> // collect and print
1059 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1060 }
1061//sampleEnd
1062}
1063```
1064
1065</div>
1066
David.Watsonbb714c52019-08-30 17:49:42 +02001067> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-22.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001068
1069We get quite a different output, where a line is printed at each emission from either `nums` or `strs` flows:
1070
1071```text
10721 -> one at 452 ms from start
10732 -> one at 651 ms from start
10742 -> two at 854 ms from start
10753 -> two at 952 ms from start
10763 -> three at 1256 ms from start
1077```
1078
1079<!--- TEST ARBITRARY_TIME -->
1080
1081### Flattening flows
1082
1083Flows represent asynchronously received sequences of values, so it is quite easy to get in a situation where
1084each value triggers a request for another sequence of values. For example, we can have the following
1085function that returns a flow of two strings 500 ms apart:
1086
1087<div class="sample" markdown="1" theme="idea" data-highlight-only>
1088
1089```kotlin
1090fun requestFlow(i: Int): Flow<String> = flow {
1091 emit("$i: First")
1092 delay(500) // wait 500 ms
1093 emit("$i: Second")
1094}
1095```
1096
1097</div>
1098
1099<!--- CLEAR -->
1100
1101Now if we have a flow of three integers and call `requestFlow` for each of them like this:
1102
1103<div class="sample" markdown="1" theme="idea" data-highlight-only>
1104
1105```kotlin
1106(1..3).asFlow().map { requestFlow(it) }
1107```
1108
1109</div>
1110
1111<!--- CLEAR -->
1112
1113Then we end up with a flow of flows (`Flow<Flow<String>>`) that needs to be _flattened_ into a single flow for
1114further processing. Collections and sequences have [flatten][Sequence.flatten] and [flatMap][Sequence.flatMap]
a³0e93a382020-06-01 11:22:51 +04001115operators for this. However, due to the asynchronous nature of flows they call for different _modes_ of flattening,
David.Watsonbb714c52019-08-30 17:49:42 +02001116as such, there is a family of flattening operators on flows.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001117
1118#### flatMapConcat
1119
1120Concatenating mode is implemented by [flatMapConcat] and [flattenConcat] operators. They are the most direct
David.Watsonbb714c52019-08-30 17:49:42 +02001121analogues of the corresponding sequence operators. They wait for the inner flow to complete before
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001122starting to collect the next one as the following example shows:
1123
1124<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1125
1126```kotlin
1127import kotlinx.coroutines.*
1128import kotlinx.coroutines.flow.*
1129
1130fun requestFlow(i: Int): Flow<String> = flow {
1131 emit("$i: First")
1132 delay(500) // wait 500 ms
1133 emit("$i: Second")
1134}
1135
1136fun main() = runBlocking<Unit> {
1137//sampleStart
Roman Elizarova73862f2019-09-02 17:31:14 +03001138 val startTime = System.currentTimeMillis() // remember the start time
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001139 (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
1140 .flatMapConcat { requestFlow(it) }
1141 .collect { value -> // collect and print
1142 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1143 }
1144//sampleEnd
1145}
1146```
1147
1148</div>
1149
David.Watsonbb714c52019-08-30 17:49:42 +02001150> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-23.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001151
1152The sequential nature of [flatMapConcat] is clearly seen in the output:
1153
1154```text
11551: First at 121 ms from start
11561: Second at 622 ms from start
11572: First at 727 ms from start
11582: Second at 1227 ms from start
11593: First at 1328 ms from start
11603: Second at 1829 ms from start
1161```
1162
1163<!--- TEST ARBITRARY_TIME -->
1164
1165#### flatMapMerge
1166
1167Another flattening mode is to concurrently collect all the incoming flows and merge their values into
1168a single flow so that values are emitted as soon as possible.
1169It is implemented by [flatMapMerge] and [flattenMerge] operators. They both accept an optional
1170`concurrency` parameter that limits the number of concurrent flows that are collected at the same time
1171(it is equal to [DEFAULT_CONCURRENCY] by default).
1172
1173<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1174
1175```kotlin
1176import kotlinx.coroutines.*
1177import kotlinx.coroutines.flow.*
1178
1179fun requestFlow(i: Int): Flow<String> = flow {
1180 emit("$i: First")
1181 delay(500) // wait 500 ms
1182 emit("$i: Second")
1183}
1184
1185fun main() = runBlocking<Unit> {
1186//sampleStart
Roman Elizarova73862f2019-09-02 17:31:14 +03001187 val startTime = System.currentTimeMillis() // remember the start time
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001188 (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
1189 .flatMapMerge { requestFlow(it) }
1190 .collect { value -> // collect and print
1191 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1192 }
1193//sampleEnd
1194}
1195```
1196
1197</div>
1198
David.Watsonbb714c52019-08-30 17:49:42 +02001199> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-24.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001200
1201The concurrent nature of [flatMapMerge] is obvious:
1202
1203```text
12041: First at 136 ms from start
12052: First at 231 ms from start
12063: First at 333 ms from start
12071: Second at 639 ms from start
12082: Second at 732 ms from start
12093: Second at 833 ms from start
1210```
1211
1212<!--- TEST ARBITRARY_TIME -->
1213
David.Watsonbb714c52019-08-30 17:49:42 +02001214> Note that the [flatMapMerge] calls its block of code (`{ requestFlow(it) }` in this example) sequentially, but
1215collects the resulting flows concurrently, it is the equivalent of performing a sequential
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001216`map { requestFlow(it) }` first and then calling [flattenMerge] on the result.
1217
1218#### flatMapLatest
1219
David.Watsonbb714c52019-08-30 17:49:42 +02001220In a similar way to the [collectLatest] operator, that was shown in
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001221["Processing the latest value"](#processing-the-latest-value) section, there is the corresponding "Latest"
David.Watsonbb714c52019-08-30 17:49:42 +02001222flattening mode where a collection of the previous flow is cancelled as soon as new flow is emitted.
1223It is implemented by the [flatMapLatest] operator.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001224
1225<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1226
1227```kotlin
1228import kotlinx.coroutines.*
1229import kotlinx.coroutines.flow.*
1230
1231fun requestFlow(i: Int): Flow<String> = flow {
1232 emit("$i: First")
1233 delay(500) // wait 500 ms
1234 emit("$i: Second")
1235}
1236
1237fun main() = runBlocking<Unit> {
1238//sampleStart
Roman Elizarova73862f2019-09-02 17:31:14 +03001239 val startTime = System.currentTimeMillis() // remember the start time
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001240 (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
1241 .flatMapLatest { requestFlow(it) }
1242 .collect { value -> // collect and print
1243 println("$value at ${System.currentTimeMillis() - startTime} ms from start")
1244 }
1245//sampleEnd
1246}
1247```
1248
1249</div>
1250
David.Watsonbb714c52019-08-30 17:49:42 +02001251> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-25.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001252
David.Watsonbb714c52019-08-30 17:49:42 +02001253The output here in this example is a good demonstration of how [flatMapLatest] works:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001254
1255```text
12561: First at 142 ms from start
12572: First at 322 ms from start
12583: First at 425 ms from start
12593: Second at 931 ms from start
1260```
1261
1262<!--- TEST ARBITRARY_TIME -->
1263
1264> Note that [flatMapLatest] cancels all the code in its block (`{ requestFlow(it) }` in this example) on a new value.
1265It makes no difference in this particular example, because the call to `requestFlow` itself is fast, not-suspending,
1266and cannot be cancelled. However, it would show up if we were to use suspending functions like `delay` in there.
1267
1268### Flow exceptions
1269
David.Watsonbb714c52019-08-30 17:49:42 +02001270Flow collection can complete with an exception when an emitter or code inside the operators throw an exception.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001271There are several ways to handle these exceptions.
1272
1273#### Collector try and catch
1274
1275A collector can use Kotlin's [`try/catch`][exceptions] block to handle exceptions:
1276
1277<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1278
1279```kotlin
1280import kotlinx.coroutines.*
1281import kotlinx.coroutines.flow.*
1282
1283//sampleStart
Roman Elizarovccdc5632020-07-16 18:30:32 +03001284fun simple(): Flow<Int> = flow {
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001285 for (i in 1..3) {
1286 println("Emitting $i")
1287 emit(i) // emit next value
1288 }
1289}
1290
1291fun main() = runBlocking<Unit> {
1292 try {
Roman Elizarovccdc5632020-07-16 18:30:32 +03001293 simple().collect { value ->
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001294 println(value)
1295 check(value <= 1) { "Collected $value" }
1296 }
1297 } catch (e: Throwable) {
1298 println("Caught $e")
1299 }
1300}
1301//sampleEnd
1302```
1303
1304</div>
1305
David.Watsonbb714c52019-08-30 17:49:42 +02001306> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-26.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001307
1308This code successfully catches an exception in [collect] terminal operator and,
David.Watsonbb714c52019-08-30 17:49:42 +02001309as we see, no more values are emitted after that:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001310
1311```text
1312Emitting 1
13131
1314Emitting 2
13152
1316Caught java.lang.IllegalStateException: Collected 2
1317```
1318
1319<!--- TEST -->
1320
1321#### Everything is caught
1322
David.Watsonbb714c52019-08-30 17:49:42 +02001323The previous example actually catches any exception happening in the emitter or in any intermediate or terminal operators.
1324For example, let's change the code so that emitted values are [mapped][map] to strings,
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001325but the corresponding code produces an exception:
1326
1327<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1328
1329```kotlin
1330import kotlinx.coroutines.*
1331import kotlinx.coroutines.flow.*
1332
1333//sampleStart
Roman Elizarovccdc5632020-07-16 18:30:32 +03001334fun simple(): Flow<String> =
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001335 flow {
1336 for (i in 1..3) {
1337 println("Emitting $i")
1338 emit(i) // emit next value
1339 }
1340 }
1341 .map { value ->
1342 check(value <= 1) { "Crashed on $value" }
1343 "string $value"
1344 }
1345
1346fun main() = runBlocking<Unit> {
1347 try {
Roman Elizarovccdc5632020-07-16 18:30:32 +03001348 simple().collect { value -> println(value) }
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001349 } catch (e: Throwable) {
1350 println("Caught $e")
1351 }
1352}
1353//sampleEnd
1354```
1355
1356</div>
1357
David.Watsonbb714c52019-08-30 17:49:42 +02001358> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-27.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001359
1360This exception is still caught and collection is stopped:
1361
1362```text
1363Emitting 1
1364string 1
1365Emitting 2
1366Caught java.lang.IllegalStateException: Crashed on 2
1367```
1368
1369<!--- TEST -->
1370
1371### Exception transparency
1372
David.Watsonbb714c52019-08-30 17:49:42 +02001373But how can code of the emitter encapsulate its exception handling behavior?
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001374
David.Watsonbb714c52019-08-30 17:49:42 +02001375Flows must be _transparent to exceptions_ and it is a violation of the exception transparency to [emit][FlowCollector.emit] values in the
1376`flow { ... }` builder from inside of a `try/catch` block. This guarantees that a collector throwing an exception
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001377can always catch it using `try/catch` as in the previous example.
1378
David.Watsonbb714c52019-08-30 17:49:42 +02001379The emitter can use a [catch] operator that preserves this exception transparency and allows encapsulation
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001380of its exception handling. The body of the `catch` operator can analyze an exception
1381and react to it in different ways depending on which exception was caught:
1382
1383* Exceptions can be rethrown using `throw`.
1384* Exceptions can be turned into emission of values using [emit][FlowCollector.emit] from the body of [catch].
1385* Exceptions can be ignored, logged, or processed by some other code.
1386
David.Watsonbb714c52019-08-30 17:49:42 +02001387For example, let us emit the text on catching an exception:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001388
1389<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1390
1391```kotlin
1392import kotlinx.coroutines.*
1393import kotlinx.coroutines.flow.*
1394
Roman Elizarovccdc5632020-07-16 18:30:32 +03001395fun simple(): Flow<String> =
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001396 flow {
1397 for (i in 1..3) {
1398 println("Emitting $i")
1399 emit(i) // emit next value
1400 }
1401 }
1402 .map { value ->
1403 check(value <= 1) { "Crashed on $value" }
1404 "string $value"
1405 }
1406
1407fun main() = runBlocking<Unit> {
1408//sampleStart
Roman Elizarovccdc5632020-07-16 18:30:32 +03001409 simple()
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001410 .catch { e -> emit("Caught $e") } // emit on exception
1411 .collect { value -> println(value) }
1412//sampleEnd
1413}
1414```
1415
1416</div>
1417
David.Watsonbb714c52019-08-30 17:49:42 +02001418> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-28.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001419
1420The output of the example is the same, even though we do not have `try/catch` around the code anymore.
1421
1422<!--- TEST
1423Emitting 1
1424string 1
1425Emitting 2
1426Caught java.lang.IllegalStateException: Crashed on 2
1427-->
1428
1429#### Transparent catch
1430
1431The [catch] intermediate operator, honoring exception transparency, catches only upstream exceptions
1432(that is an exception from all the operators above `catch`, but not below it).
1433If the block in `collect { ... }` (placed below `catch`) throws an exception then it escapes:
1434
1435<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1436
1437```kotlin
1438import kotlinx.coroutines.*
1439import kotlinx.coroutines.flow.*
1440
1441//sampleStart
Roman Elizarovccdc5632020-07-16 18:30:32 +03001442fun simple(): Flow<Int> = flow {
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001443 for (i in 1..3) {
1444 println("Emitting $i")
1445 emit(i)
1446 }
1447}
1448
1449fun main() = runBlocking<Unit> {
Roman Elizarovccdc5632020-07-16 18:30:32 +03001450 simple()
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001451 .catch { e -> println("Caught $e") } // does not catch downstream exceptions
1452 .collect { value ->
1453 check(value <= 1) { "Collected $value" }
1454 println(value)
1455 }
1456}
1457//sampleEnd
1458```
1459
1460</div>
1461
David.Watsonbb714c52019-08-30 17:49:42 +02001462> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-29.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001463
David.Watsonbb714c52019-08-30 17:49:42 +02001464A "Caught ..." message is not printed despite there being a `catch` operator:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001465
1466<!--- TEST EXCEPTION
1467Emitting 1
14681
1469Emitting 2
1470Exception in thread "main" java.lang.IllegalStateException: Collected 2
1471 at ...
1472-->
1473
1474#### Catching declaratively
1475
David.Watsonbb714c52019-08-30 17:49:42 +02001476We can combine the declarative nature of the [catch] operator with a desire to handle all the exceptions, by moving the body
1477of the [collect] operator into [onEach] and putting it before the `catch` operator. Collection of this flow must
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001478be triggered by a call to `collect()` without parameters:
1479
1480<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1481
1482```kotlin
1483import kotlinx.coroutines.*
1484import kotlinx.coroutines.flow.*
1485
Roman Elizarovccdc5632020-07-16 18:30:32 +03001486fun simple(): Flow<Int> = flow {
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001487 for (i in 1..3) {
1488 println("Emitting $i")
1489 emit(i)
1490 }
1491}
1492
1493fun main() = runBlocking<Unit> {
1494//sampleStart
Roman Elizarovccdc5632020-07-16 18:30:32 +03001495 simple()
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001496 .onEach { value ->
1497 check(value <= 1) { "Collected $value" }
1498 println(value)
1499 }
1500 .catch { e -> println("Caught $e") }
1501 .collect()
1502//sampleEnd
1503}
1504```
1505
1506</div>
1507
David.Watsonbb714c52019-08-30 17:49:42 +02001508> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-30.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001509
David.Watsonbb714c52019-08-30 17:49:42 +02001510Now we can see that a "Caught ..." message is printed and so we can catch all the exceptions without explicitly
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001511using a `try/catch` block:
1512
1513<!--- TEST EXCEPTION
1514Emitting 1
15151
1516Emitting 2
1517Caught java.lang.IllegalStateException: Collected 2
1518-->
1519
1520### Flow completion
1521
David.Watsonbb714c52019-08-30 17:49:42 +02001522When flow collection completes (normally or exceptionally) it may need to execute an action.
1523As you may have already noticed, it can be done in two ways: imperative or declarative.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001524
1525#### Imperative finally block
1526
David.Watsonbb714c52019-08-30 17:49:42 +02001527In addition to `try`/`catch`, a collector can also use a `finally` block to execute an action
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001528upon `collect` completion.
1529
1530<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1531
1532```kotlin
1533import kotlinx.coroutines.*
1534import kotlinx.coroutines.flow.*
1535
1536//sampleStart
Roman Elizarovccdc5632020-07-16 18:30:32 +03001537fun simple(): Flow<Int> = (1..3).asFlow()
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001538
1539fun main() = runBlocking<Unit> {
1540 try {
Roman Elizarovccdc5632020-07-16 18:30:32 +03001541 simple().collect { value -> println(value) }
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001542 } finally {
1543 println("Done")
1544 }
1545}
1546//sampleEnd
1547```
1548
1549</div>
1550
David.Watsonbb714c52019-08-30 17:49:42 +02001551> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-31.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001552
Roman Elizarovccdc5632020-07-16 18:30:32 +03001553This code prints three numbers produced by the `simple` flow followed by a "Done" string:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001554
1555```text
15561
15572
15583
1559Done
1560```
1561
1562<!--- TEST -->
1563
1564#### Declarative handling
1565
David.Watsonbb714c52019-08-30 17:49:42 +02001566For the declarative approach, flow has [onCompletion] intermediate operator that is invoked
1567when the flow has completely collected.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001568
David.Watsonbb714c52019-08-30 17:49:42 +02001569The previous example can be rewritten using an [onCompletion] operator and produces the same output:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001570
1571<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1572
1573```kotlin
1574import kotlinx.coroutines.*
1575import kotlinx.coroutines.flow.*
1576
Roman Elizarovccdc5632020-07-16 18:30:32 +03001577fun simple(): Flow<Int> = (1..3).asFlow()
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001578
1579fun main() = runBlocking<Unit> {
1580//sampleStart
Roman Elizarovccdc5632020-07-16 18:30:32 +03001581 simple()
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001582 .onCompletion { println("Done") }
1583 .collect { value -> println(value) }
1584//sampleEnd
1585}
1586```
1587</div>
1588
David.Watsonbb714c52019-08-30 17:49:42 +02001589> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-32.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001590
1591<!--- TEST
15921
15932
15943
1595Done
1596-->
1597
1598The key advantage of [onCompletion] is a nullable `Throwable` parameter of the lambda that can be used
David.Watsonbb714c52019-08-30 17:49:42 +02001599to determine whether the flow collection was completed normally or exceptionally. In the following
Roman Elizarovccdc5632020-07-16 18:30:32 +03001600example the `simple` flow throws an exception after emitting the number 1:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001601
1602<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1603
1604```kotlin
1605import kotlinx.coroutines.*
1606import kotlinx.coroutines.flow.*
1607
1608//sampleStart
Roman Elizarovccdc5632020-07-16 18:30:32 +03001609fun simple(): Flow<Int> = flow {
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001610 emit(1)
1611 throw RuntimeException()
1612}
1613
1614fun main() = runBlocking<Unit> {
Roman Elizarovccdc5632020-07-16 18:30:32 +03001615 simple()
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001616 .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
1617 .catch { cause -> println("Caught exception") }
1618 .collect { value -> println(value) }
1619}
1620//sampleEnd
1621```
1622</div>
1623
David.Watsonbb714c52019-08-30 17:49:42 +02001624> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-33.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001625
1626As you may expect, it prints:
1627
1628```text
16291
1630Flow completed exceptionally
1631Caught exception
1632```
1633
1634<!--- TEST -->
1635
David.Watsonbb714c52019-08-30 17:49:42 +02001636The [onCompletion] operator, unlike [catch], does not handle the exception. As we can see from the above
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001637example code, the exception still flows downstream. It will be delivered to further `onCompletion` operators
David.Watsonbb714c52019-08-30 17:49:42 +02001638and can be handled with a `catch` operator.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001639
Roman Elizaroveb4e7d32020-04-27 15:04:00 +03001640#### Successful completion
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001641
Roman Elizaroveb4e7d32020-04-27 15:04:00 +03001642Another difference with [catch] operator is that [onCompletion] sees all exceptions and receives
1643a `null` exception only on successful completion of the upstream flow (without cancellation or failure).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001644
1645<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1646
1647```kotlin
1648import kotlinx.coroutines.*
1649import kotlinx.coroutines.flow.*
1650
1651//sampleStart
Roman Elizarovccdc5632020-07-16 18:30:32 +03001652fun simple(): Flow<Int> = (1..3).asFlow()
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001653
1654fun main() = runBlocking<Unit> {
Roman Elizarovccdc5632020-07-16 18:30:32 +03001655 simple()
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001656 .onCompletion { cause -> println("Flow completed with $cause") }
1657 .collect { value ->
1658 check(value <= 1) { "Collected $value" }
1659 println(value)
1660 }
1661}
1662//sampleEnd
1663```
1664
1665</div>
1666
David.Watsonbb714c52019-08-30 17:49:42 +02001667> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-34.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001668
Roman Elizaroveb4e7d32020-04-27 15:04:00 +03001669We can see the completion cause is not null, because the flow was aborted due to downstream exception:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001670
1671```text
16721
Roman Elizaroveb4e7d32020-04-27 15:04:00 +03001673Flow completed with java.lang.IllegalStateException: Collected 2
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001674Exception in thread "main" java.lang.IllegalStateException: Collected 2
1675```
1676
1677<!--- TEST EXCEPTION -->
1678
1679### Imperative versus declarative
1680
David.Watsonbb714c52019-08-30 17:49:42 +02001681Now we know how to collect flow, and handle its completion and exceptions in both imperative and declarative ways.
1682The natural question here is, which approach is preferred and why?
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001683As a library, we do not advocate for any particular approach and believe that both options
1684are valid and should be selected according to your own preferences and code style.
1685
1686### Launching flow
1687
David.Watsonbb714c52019-08-30 17:49:42 +02001688It is easy to use flows to represent asynchronous events that are coming from some source.
1689In this case, we need an analogue of the `addEventListener` function that registers a piece of code with a reaction
1690for incoming events and continues further work. The [onEach] operator can serve this role.
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001691However, `onEach` is an intermediate operator. We also need a terminal operator to collect the flow.
1692Otherwise, just calling `onEach` has no effect.
1693
David.Watsonbb714c52019-08-30 17:49:42 +02001694If we use the [collect] terminal operator after `onEach`, then the code after it will wait until the flow is collected:
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001695
1696<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1697
1698```kotlin
1699import kotlinx.coroutines.*
1700import kotlinx.coroutines.flow.*
1701
1702//sampleStart
1703// Imitate a flow of events
1704fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
1705
1706fun main() = runBlocking<Unit> {
1707 events()
1708 .onEach { event -> println("Event: $event") }
1709 .collect() // <--- Collecting the flow waits
1710 println("Done")
1711}
1712//sampleEnd
1713```
1714
1715</div>
1716
David.Watsonbb714c52019-08-30 17:49:42 +02001717> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-35.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001718
1719As you can see, it prints:
1720
1721```text
1722Event: 1
1723Event: 2
1724Event: 3
1725Done
1726```
1727
1728<!--- TEST -->
1729
David.Watsonbb714c52019-08-30 17:49:42 +02001730The [launchIn] terminal operator comes in handy here. By replacing `collect` with `launchIn` we can
1731launch a collection of the flow in a separate coroutine, so that execution of further code
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001732immediately continues:
1733
1734<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1735
1736```kotlin
1737import kotlinx.coroutines.*
1738import kotlinx.coroutines.flow.*
1739
1740// Imitate a flow of events
1741fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
1742
1743//sampleStart
1744fun main() = runBlocking<Unit> {
1745 events()
1746 .onEach { event -> println("Event: $event") }
1747 .launchIn(this) // <--- Launching the flow in a separate coroutine
1748 println("Done")
1749}
1750//sampleEnd
1751```
1752
1753</div>
1754
David.Watsonbb714c52019-08-30 17:49:42 +02001755> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-36.kt).
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001756
1757It prints:
1758
1759```text
1760Done
1761Event: 1
1762Event: 2
1763Event: 3
1764```
1765
1766<!--- TEST -->
1767
1768The required parameter to `launchIn` must specify a [CoroutineScope] in which the coroutine to collect the flow is
David.Watsonbb714c52019-08-30 17:49:42 +02001769launched. In the above example this scope comes from the [runBlocking]
1770coroutine builder, so while the flow is running, this [runBlocking] scope waits for completion of its child coroutine
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001771and keeps the main function from returning and terminating this example.
1772
David.Watsonbb714c52019-08-30 17:49:42 +02001773In actual applications a scope will come from an entity with a limited
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001774lifetime. As soon as the lifetime of this entity is terminated the corresponding scope is cancelled, cancelling
David.Watsonbb714c52019-08-30 17:49:42 +02001775the collection of the corresponding flow. This way the pair of `onEach { ... }.launchIn(scope)` works
1776like the `addEventListener`. However, there is no need for the corresponding `removeEventListener` function,
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001777as cancellation and structured concurrency serve this purpose.
1778
David.Watsonbb714c52019-08-30 17:49:42 +02001779Note that [launchIn] also returns a [Job], which can be used to [cancel][Job.cancel] the corresponding flow collection
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001780coroutine only without cancelling the whole scope or to [join][Job.join] it.
1781
Roman Elizarov9bbb6692020-06-03 12:02:17 +03001782### Flow cancellation checks
1783
1784For convenience, the [flow] builder performs additional [ensureActive] checks for cancellation on each emitted value.
1785It means that a busy loop emitting from a `flow { ... }` is cancellable:
1786
1787<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1788
1789```kotlin
1790import kotlinx.coroutines.*
1791import kotlinx.coroutines.flow.*
1792
1793//sampleStart
1794fun foo(): Flow<Int> = flow {
1795 for (i in 1..5) {
1796 println("Emitting $i")
1797 emit(i)
1798 }
1799}
1800
1801fun main() = runBlocking<Unit> {
1802 foo().collect { value ->
1803 if (value == 3) cancel()
1804 println(value)
1805 }
1806}
1807//sampleEnd
1808```
1809
1810</div>
1811
1812> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-37.kt).
1813
1814We get only numbers up to 3 and a [CancellationException] after trying to emit number 4:
1815
1816```text
1817Emitting 1
18181
1819Emitting 2
18202
1821Emitting 3
18223
1823Emitting 4
1824Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c
1825```
1826
1827<!--- TEST EXCEPTION -->
1828
1829However, most other flow operators do not do additional cancellation checks on their own for performance reasons.
1830For example, if you use [IntRange.asFlow] extension to write the same busy loop and don't suspend anywhere,
1831then there are no checks for cancellation:
1832
1833<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1834
1835```kotlin
1836import kotlinx.coroutines.*
1837import kotlinx.coroutines.flow.*
1838
1839//sampleStart
1840fun main() = runBlocking<Unit> {
1841 (1..5).asFlow().collect { value ->
1842 if (value == 3) cancel()
1843 println(value)
1844 }
1845}
1846//sampleEnd
1847```
1848
1849</div>
1850
1851> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-38.kt).
1852
1853All numbers from 1 to 5 are collected and cancellation gets detected only before return from `runBlocking`:
1854
1855```text
18561
18572
18583
18594
18605
1861Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23
1862```
1863
1864<!--- TEST EXCEPTION -->
1865
1866#### Making busy flow cancellable
1867
1868In the case where you have a busy loop with coroutines you must explicitly check for cancellation.
1869You can add `.onEach { currentCoroutineContext().ensureActive() }`, but there is a ready-to-use
1870[cancellable] operator provided to do that:
1871
1872<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1873
1874```kotlin
1875import kotlinx.coroutines.*
1876import kotlinx.coroutines.flow.*
1877
1878//sampleStart
1879fun main() = runBlocking<Unit> {
1880 (1..5).asFlow().cancellable().collect { value ->
1881 if (value == 3) cancel()
1882 println(value)
1883 }
1884}
1885//sampleEnd
1886```
1887
1888</div>
1889
1890> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-39.kt).
1891
1892With the `cancellable` operator only the numbers from 1 to 3 are collected:
1893
1894```text
18951
18962
18973
1898Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365
1899```
1900
1901<!--- TEST EXCEPTION -->
1902
Vsevolod Tolstopyatovc99704a2019-09-24 19:30:49 +03001903### Flow and Reactive Streams
1904
1905For those who are familiar with [Reactive Streams](https://www.reactive-streams.org/) or reactive frameworks such as RxJava and project Reactor,
1906design of the Flow may look very familiar.
1907
1908Indeed, its design was inspired by Reactive Streams and its various implementations. But Flow main goal is to have as simple design as possible,
1909be Kotlin and suspension friendly and respect structured concurrency. Achieving this goal would be impossible without reactive pioneers and their tremendous work. You can read the complete story in [Reactive Streams and Kotlin Flows](https://medium.com/@elizarov/reactive-streams-and-kotlin-flows-bfd12772cda4) article.
1910
1911While being different, conceptually, Flow *is* a reactive stream and it is possible to convert it to the reactive (spec and TCK compliant) Publisher and vice versa.
Vsevolod Tolstopyatov3250e472020-04-24 19:32:55 +03001912Such converters are provided by `kotlinx.coroutines` out-of-the-box and can be found in corresponding reactive modules (`kotlinx-coroutines-reactive` for Reactive Streams, `kotlinx-coroutines-reactor` for Project Reactor and `kotlinx-coroutines-rx2`/`kotlinx-coroutines-rx3` for RxJava2/RxJava3).
Vsevolod Tolstopyatovc99704a2019-09-24 19:30:49 +03001913Integration modules include conversions from and to `Flow`, integration with Reactor's `Context` and suspension-friendly ways to work with various reactive entities.
1914
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001915<!-- stdlib references -->
1916
1917[collections]: https://kotlinlang.org/docs/reference/collections-overview.html
1918[List]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/-list/index.html
1919[forEach]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/for-each.html
1920[Sequence]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/index.html
1921[Sequence.zip]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/zip.html
1922[Sequence.flatten]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/flatten.html
1923[Sequence.flatMap]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/flat-map.html
1924[exceptions]: https://kotlinlang.org/docs/reference/exceptions.html
1925
1926<!--- MODULE kotlinx-coroutines-core -->
1927<!--- INDEX kotlinx.coroutines -->
1928[delay]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/delay.html
1929[withTimeoutOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-timeout-or-null.html
1930[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
1931[Dispatchers.Main]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-main.html
1932[withContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-context.html
1933[CoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-dispatcher/index.html
1934[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html
1935[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
1936[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/index.html
1937[Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/cancel.html
1938[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html
Roman Elizarov9bbb6692020-06-03 12:02:17 +03001939[ensureActive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/ensure-active.html
1940[CancellationException]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-cancellation-exception/index.html
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001941<!--- INDEX kotlinx.coroutines.flow -->
1942[Flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html
1943[flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow.html
1944[FlowCollector.emit]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow-collector/emit.html
1945[collect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect.html
1946[flowOf]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-of.html
1947[map]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/map.html
1948[filter]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/filter.html
1949[transform]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/transform.html
1950[take]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/take.html
1951[toList]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-list.html
1952[toSet]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-set.html
1953[first]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/first.html
1954[single]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/single.html
1955[reduce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/reduce.html
1956[fold]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/fold.html
1957[flowOn]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-on.html
1958[buffer]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/buffer.html
1959[conflate]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/conflate.html
1960[collectLatest]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect-latest.html
1961[zip]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/zip.html
1962[combine]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/combine.html
1963[onEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-each.html
1964[flatMapConcat]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-concat.html
1965[flattenConcat]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-concat.html
1966[flatMapMerge]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-merge.html
1967[flattenMerge]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-merge.html
1968[DEFAULT_CONCURRENCY]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-d-e-f-a-u-l-t_-c-o-n-c-u-r-r-e-n-c-y.html
1969[flatMapLatest]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-latest.html
1970[catch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/catch.html
1971[onCompletion]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-completion.html
1972[launchIn]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/launch-in.html
Roman Elizarov9bbb6692020-06-03 12:02:17 +03001973[IntRange.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/kotlin.ranges.-int-range/as-flow.html
1974[cancellable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/cancellable.html
Roman Elizarov3258e1f2019-08-22 20:08:48 +03001975<!--- END -->