Improve the docs and guide on flow cancellation (#2043)
* Improve the docs and guide on flow cancellation
* Remove outdated phrase on "flow infrastructure does not introduce additional cancellation points". They were introduced by #2028
* Add a section on "Flow cancellation check" with examples on `Flow.cancellable()`` operator.
* Add a bit more detail in `flow` and `cancellable` docs with links to `ensureActive()`.
diff --git a/docs/flow.md b/docs/flow.md
index 3f14b10..9b33158 100644
--- a/docs/flow.md
+++ b/docs/flow.md
@@ -10,7 +10,7 @@
* [Suspending functions](#suspending-functions)
* [Flows](#flows)
* [Flows are cold](#flows-are-cold)
- * [Flow cancellation](#flow-cancellation)
+ * [Flow cancellation basics](#flow-cancellation-basics)
* [Flow builders](#flow-builders)
* [Intermediate flow operators](#intermediate-flow-operators)
* [Transform operator](#transform-operator)
@@ -42,6 +42,8 @@
* [Successful completion](#successful-completion)
* [Imperative versus declarative](#imperative-versus-declarative)
* [Launching flow](#launching-flow)
+ * [Flow cancellation checks](#flow-cancellation-checks)
+ * [Making busy flow cancellable](#making-busy-flow-cancellable)
* [Flow and Reactive Streams](#flow-and-reactive-streams)
<!--- END -->
@@ -267,12 +269,10 @@
By itself, `foo()` returns quickly and does not wait for anything. The flow starts every time it is collected,
that is why we see "Flow started" when we call `collect` again.
-### Flow cancellation
+### Flow cancellation basics
-Flow adheres to the general cooperative cancellation of coroutines. However, flow infrastructure does not introduce
-additional cancellation points. It is fully transparent for cancellation. As usual, flow collection can be
-cancelled when the flow is suspended in a cancellable suspending function (like [delay]), and cannot be cancelled otherwise.
-
+Flow adheres to the general cooperative cancellation of coroutines. As usual, flow collection can be
+cancelled when the flow is suspended in a cancellable suspending function (like [delay]).
The following example shows how the flow gets cancelled on a timeout when running in a [withTimeoutOrNull] block
and stops executing its code:
@@ -316,6 +316,8 @@
<!--- TEST -->
+See [Flow cancellation checks](#flow-cancellation-checks) section for more details.
+
### Flow builders
The `flow { ... }` builder from the previous examples is the most basic one. There are other builders for
@@ -1777,6 +1779,127 @@
Note that [launchIn] also returns a [Job], which can be used to [cancel][Job.cancel] the corresponding flow collection
coroutine only without cancelling the whole scope or to [join][Job.join] it.
+### Flow cancellation checks
+
+For convenience, the [flow] builder performs additional [ensureActive] checks for cancellation on each emitted value.
+It means that a busy loop emitting from a `flow { ... }` is cancellable:
+
+<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+//sampleStart
+fun foo(): Flow<Int> = flow {
+ for (i in 1..5) {
+ println("Emitting $i")
+ emit(i)
+ }
+}
+
+fun main() = runBlocking<Unit> {
+ foo().collect { value ->
+ if (value == 3) cancel()
+ println(value)
+ }
+}
+//sampleEnd
+```
+
+</div>
+
+> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-37.kt).
+
+We get only numbers up to 3 and a [CancellationException] after trying to emit number 4:
+
+```text
+Emitting 1
+1
+Emitting 2
+2
+Emitting 3
+3
+Emitting 4
+Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c
+```
+
+<!--- TEST EXCEPTION -->
+
+However, most other flow operators do not do additional cancellation checks on their own for performance reasons.
+For example, if you use [IntRange.asFlow] extension to write the same busy loop and don't suspend anywhere,
+then there are no checks for cancellation:
+
+<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+//sampleStart
+fun main() = runBlocking<Unit> {
+ (1..5).asFlow().collect { value ->
+ if (value == 3) cancel()
+ println(value)
+ }
+}
+//sampleEnd
+```
+
+</div>
+
+> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-38.kt).
+
+All numbers from 1 to 5 are collected and cancellation gets detected only before return from `runBlocking`:
+
+```text
+1
+2
+3
+4
+5
+Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23
+```
+
+<!--- TEST EXCEPTION -->
+
+#### Making busy flow cancellable
+
+In the case where you have a busy loop with coroutines you must explicitly check for cancellation.
+You can add `.onEach { currentCoroutineContext().ensureActive() }`, but there is a ready-to-use
+[cancellable] operator provided to do that:
+
+<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+//sampleStart
+fun main() = runBlocking<Unit> {
+ (1..5).asFlow().cancellable().collect { value ->
+ if (value == 3) cancel()
+ println(value)
+ }
+}
+//sampleEnd
+```
+
+</div>
+
+> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-39.kt).
+
+With the `cancellable` operator only the numbers from 1 to 3 are collected:
+
+```text
+1
+2
+3
+Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365
+```
+
+<!--- TEST EXCEPTION -->
+
### Flow and Reactive Streams
For those who are familiar with [Reactive Streams](https://www.reactive-streams.org/) or reactive frameworks such as RxJava and project Reactor,
@@ -1813,6 +1936,8 @@
[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/index.html
[Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/cancel.html
[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html
+[ensureActive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/ensure-active.html
+[CancellationException]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-cancellation-exception/index.html
<!--- INDEX kotlinx.coroutines.flow -->
[Flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html
[flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow.html
@@ -1845,4 +1970,6 @@
[catch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/catch.html
[onCompletion]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-completion.html
[launchIn]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/launch-in.html
+[IntRange.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/kotlin.ranges.-int-range/as-flow.html
+[cancellable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/cancellable.html
<!--- END -->