Improve the docs and guide on flow cancellation (#2043)

* Improve the docs and guide on flow cancellation
* Remove outdated phrase on "flow infrastructure does not introduce additional cancellation points". They were introduced by #2028
* Add a section on "Flow cancellation check" with examples on `Flow.cancellable()`` operator.
* Add a bit more detail in `flow` and `cancellable` docs with links to `ensureActive()`.
diff --git a/docs/flow.md b/docs/flow.md
index 3f14b10..9b33158 100644
--- a/docs/flow.md
+++ b/docs/flow.md
@@ -10,7 +10,7 @@
     * [Suspending functions](#suspending-functions)
     * [Flows](#flows)
   * [Flows are cold](#flows-are-cold)
-  * [Flow cancellation](#flow-cancellation)
+  * [Flow cancellation basics](#flow-cancellation-basics)
   * [Flow builders](#flow-builders)
   * [Intermediate flow operators](#intermediate-flow-operators)
     * [Transform operator](#transform-operator)
@@ -42,6 +42,8 @@
     * [Successful completion](#successful-completion)
   * [Imperative versus declarative](#imperative-versus-declarative)
   * [Launching flow](#launching-flow)
+  * [Flow cancellation checks](#flow-cancellation-checks)
+    * [Making busy flow cancellable](#making-busy-flow-cancellable)
   * [Flow and Reactive Streams](#flow-and-reactive-streams)
 
 <!--- END -->
@@ -267,12 +269,10 @@
 By itself, `foo()` returns quickly and does not wait for anything. The flow starts every time it is collected,
 that is why we see "Flow started" when we call `collect` again.
 
-### Flow cancellation
+### Flow cancellation basics
 
-Flow adheres to the general cooperative cancellation of coroutines. However, flow infrastructure does not introduce
-additional cancellation points. It is fully transparent for cancellation. As usual, flow collection can be 
-cancelled when the flow is suspended in a cancellable suspending function (like [delay]), and cannot be cancelled otherwise.
-
+Flow adheres to the general cooperative cancellation of coroutines. As usual, flow collection can be 
+cancelled when the flow is suspended in a cancellable suspending function (like [delay]).
 The following example shows how the flow gets cancelled on a timeout when running in a [withTimeoutOrNull] block 
 and stops executing its code:
 
@@ -316,6 +316,8 @@
 
 <!--- TEST -->
 
+See [Flow cancellation checks](#flow-cancellation-checks) section for more details.
+
 ### Flow builders
 
 The `flow { ... }` builder from the previous examples is the most basic one. There are other builders for
@@ -1777,6 +1779,127 @@
 Note that [launchIn] also returns a [Job], which can be used to [cancel][Job.cancel] the corresponding flow collection
 coroutine only without cancelling the whole scope or to [join][Job.join] it.
  
+### Flow cancellation checks
+
+For convenience, the [flow] builder performs additional [ensureActive] checks for cancellation on each emitted value. 
+It means that a busy loop emitting from a `flow { ... }` is cancellable:
+ 
+<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+//sampleStart           
+fun foo(): Flow<Int> = flow { 
+    for (i in 1..5) {
+        println("Emitting $i") 
+        emit(i) 
+    }
+}
+
+fun main() = runBlocking<Unit> {
+    foo().collect { value -> 
+        if (value == 3) cancel()  
+        println(value)
+    } 
+}
+//sampleEnd
+```  
+
+</div>
+
+> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-37.kt).
+
+We get only numbers up to 3 and a [CancellationException] after trying to emit number 4:
+
+```text 
+Emitting 1
+1
+Emitting 2
+2
+Emitting 3
+3
+Emitting 4
+Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c
+```
+
+<!--- TEST EXCEPTION -->
+
+However, most other flow operators do not do additional cancellation checks on their own for performance reasons. 
+For example, if you use [IntRange.asFlow] extension to write the same busy loop and don't suspend anywhere, 
+then there are no checks for cancellation:
+
+<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+//sampleStart           
+fun main() = runBlocking<Unit> {
+    (1..5).asFlow().collect { value -> 
+        if (value == 3) cancel()  
+        println(value)
+    } 
+}
+//sampleEnd
+```  
+
+</div>
+
+> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-38.kt).
+
+All numbers from 1 to 5 are collected and cancellation gets detected only before return from `runBlocking`:
+
+```text
+1
+2
+3
+4
+5
+Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23
+```
+
+<!--- TEST EXCEPTION -->
+
+#### Making busy flow cancellable 
+
+In the case where you have a busy loop with coroutines you must explicitly check for cancellation.
+You can add `.onEach { currentCoroutineContext().ensureActive() }`, but there is a ready-to-use
+[cancellable] operator provided to do that:
+
+<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+//sampleStart           
+fun main() = runBlocking<Unit> {
+    (1..5).asFlow().cancellable().collect { value -> 
+        if (value == 3) cancel()  
+        println(value)
+    } 
+}
+//sampleEnd
+```  
+
+</div>
+
+> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-39.kt).
+
+With the `cancellable` operator only the numbers from 1 to 3 are collected:
+
+```text
+1
+2
+3
+Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365
+```
+
+<!--- TEST EXCEPTION -->
+ 
 ### Flow and Reactive Streams
 
 For those who are familiar with [Reactive Streams](https://www.reactive-streams.org/) or reactive frameworks such as RxJava and project Reactor, 
@@ -1813,6 +1936,8 @@
 [Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/index.html
 [Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/cancel.html
 [Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html
+[ensureActive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/ensure-active.html
+[CancellationException]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-cancellation-exception/index.html
 <!--- INDEX kotlinx.coroutines.flow -->
 [Flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html
 [flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow.html
@@ -1845,4 +1970,6 @@
 [catch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/catch.html
 [onCompletion]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-completion.html
 [launchIn]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/launch-in.html
+[IntRange.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/kotlin.ranges.-int-range/as-flow.html
+[cancellable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/cancellable.html
 <!--- END -->