Vsevolod Tolstopyatov | e2a5671 | 2019-06-05 18:40:18 +0300 | [diff] [blame] | 1 | /* |
Aurimas Liutikas | 79e555e | 2021-05-17 17:41:41 +0000 | [diff] [blame] | 2 | * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
Vsevolod Tolstopyatov | e2a5671 | 2019-06-05 18:40:18 +0300 | [diff] [blame] | 3 | */ |
| 4 | |
| 5 | package kotlinx.coroutines.flow.internal |
| 6 | |
| 7 | import kotlinx.coroutines.* |
| 8 | import kotlinx.coroutines.channels.* |
| 9 | import kotlinx.coroutines.flow.* |
| 10 | import kotlinx.coroutines.internal.* |
| 11 | import kotlinx.coroutines.intrinsics.* |
| 12 | import kotlin.coroutines.* |
| 13 | import kotlin.coroutines.intrinsics.* |
Vsevolod Tolstopyatov | dc4a474 | 2019-07-19 15:56:48 +0300 | [diff] [blame] | 14 | import kotlinx.coroutines.flow.internal.unsafeFlow as flow |
Vsevolod Tolstopyatov | e2a5671 | 2019-06-05 18:40:18 +0300 | [diff] [blame] | 15 | |
| 16 | /** |
| 17 | * Creates a [CoroutineScope] and calls the specified suspend block with this scope. |
| 18 | * This builder is similar to [coroutineScope] with the only exception that it *ties* lifecycle of children |
| 19 | * and itself regarding the cancellation, thus being cancelled when one of the children becomes cancelled. |
| 20 | * |
| 21 | * For example: |
| 22 | * ``` |
| 23 | * flowScope { |
| 24 | * launch { |
| 25 | * throw CancellationException() |
| 26 | * } |
| 27 | * } // <- CE will be rethrown here |
| 28 | * ``` |
| 29 | */ |
| 30 | internal suspend fun <R> flowScope(@BuilderInference block: suspend CoroutineScope.() -> R): R = |
| 31 | suspendCoroutineUninterceptedOrReturn { uCont -> |
| 32 | val coroutine = FlowCoroutine(uCont.context, uCont) |
| 33 | coroutine.startUndispatchedOrReturn(coroutine, block) |
| 34 | } |
| 35 | |
| 36 | /** |
| 37 | * Creates a flow that also provides a [CoroutineScope] for each collector |
| 38 | * Shorthand for: |
| 39 | * ``` |
| 40 | * flow { |
| 41 | * flowScope { |
| 42 | * ... |
| 43 | * } |
| 44 | * } |
| 45 | * ``` |
| 46 | * with additional constraint on cancellation. |
| 47 | * To cancel child without cancelling itself, `cancel(ChildCancelledException())` should be used. |
| 48 | */ |
| 49 | internal fun <R> scopedFlow(@BuilderInference block: suspend CoroutineScope.(FlowCollector<R>) -> Unit): Flow<R> = |
| 50 | flow { |
Vsevolod Tolstopyatov | 21bb6fb | 2020-02-26 17:14:10 +0300 | [diff] [blame] | 51 | flowScope { block(this@flow) } |
Vsevolod Tolstopyatov | e2a5671 | 2019-06-05 18:40:18 +0300 | [diff] [blame] | 52 | } |
| 53 | |
Vsevolod Tolstopyatov | 0342a0a | 2019-08-22 15:20:59 +0300 | [diff] [blame] | 54 | internal fun <T> CoroutineScope.flowProduce( |
| 55 | context: CoroutineContext, |
| 56 | capacity: Int = 0, |
| 57 | @BuilderInference block: suspend ProducerScope<T>.() -> Unit |
| 58 | ): ReceiveChannel<T> { |
| 59 | val channel = Channel<T>(capacity) |
| 60 | val newContext = newCoroutineContext(context) |
| 61 | val coroutine = FlowProduceCoroutine(newContext, channel) |
Vsevolod Tolstopyatov | 4a53d23 | 2020-03-03 15:33:40 +0300 | [diff] [blame] | 62 | coroutine.start(CoroutineStart.ATOMIC, coroutine, block) |
Vsevolod Tolstopyatov | 0342a0a | 2019-08-22 15:20:59 +0300 | [diff] [blame] | 63 | return coroutine |
| 64 | } |
| 65 | |
Vsevolod Tolstopyatov | e2a5671 | 2019-06-05 18:40:18 +0300 | [diff] [blame] | 66 | private class FlowCoroutine<T>( |
| 67 | context: CoroutineContext, |
| 68 | uCont: Continuation<T> |
| 69 | ) : ScopeCoroutine<T>(context, uCont) { |
Vsevolod Tolstopyatov | e2a5671 | 2019-06-05 18:40:18 +0300 | [diff] [blame] | 70 | public override fun childCancelled(cause: Throwable): Boolean { |
| 71 | if (cause is ChildCancelledException) return true |
| 72 | return cancelImpl(cause) |
| 73 | } |
| 74 | } |
Vsevolod Tolstopyatov | 0342a0a | 2019-08-22 15:20:59 +0300 | [diff] [blame] | 75 | |
| 76 | private class FlowProduceCoroutine<T>( |
| 77 | parentContext: CoroutineContext, |
| 78 | channel: Channel<T> |
| 79 | ) : ProducerCoroutine<T>(parentContext, channel) { |
| 80 | public override fun childCancelled(cause: Throwable): Boolean { |
| 81 | if (cause is ChildCancelledException) return true |
| 82 | return cancelImpl(cause) |
| 83 | } |
| 84 | } |