blob: b395525620944b2f891e50589e76749913d5db87 [file] [log] [blame]
Vsevolod Tolstopyatove2a56712019-06-05 18:40:18 +03001/*
Aurimas Liutikas79e555e2021-05-17 17:41:41 +00002 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Vsevolod Tolstopyatove2a56712019-06-05 18:40:18 +03003 */
4
5package kotlinx.coroutines.flow.internal
6
7import kotlinx.coroutines.*
8import kotlinx.coroutines.channels.*
9import kotlinx.coroutines.flow.*
10import kotlinx.coroutines.internal.*
11import kotlinx.coroutines.intrinsics.*
12import kotlin.coroutines.*
13import kotlin.coroutines.intrinsics.*
Vsevolod Tolstopyatovdc4a4742019-07-19 15:56:48 +030014import kotlinx.coroutines.flow.internal.unsafeFlow as flow
Vsevolod Tolstopyatove2a56712019-06-05 18:40:18 +030015
16/**
17 * Creates a [CoroutineScope] and calls the specified suspend block with this scope.
18 * This builder is similar to [coroutineScope] with the only exception that it *ties* lifecycle of children
19 * and itself regarding the cancellation, thus being cancelled when one of the children becomes cancelled.
20 *
21 * For example:
22 * ```
23 * flowScope {
24 * launch {
25 * throw CancellationException()
26 * }
27 * } // <- CE will be rethrown here
28 * ```
29 */
30internal suspend fun <R> flowScope(@BuilderInference block: suspend CoroutineScope.() -> R): R =
31 suspendCoroutineUninterceptedOrReturn { uCont ->
32 val coroutine = FlowCoroutine(uCont.context, uCont)
33 coroutine.startUndispatchedOrReturn(coroutine, block)
34 }
35
36/**
37 * Creates a flow that also provides a [CoroutineScope] for each collector
38 * Shorthand for:
39 * ```
40 * flow {
41 * flowScope {
42 * ...
43 * }
44 * }
45 * ```
46 * with additional constraint on cancellation.
47 * To cancel child without cancelling itself, `cancel(ChildCancelledException())` should be used.
48 */
49internal fun <R> scopedFlow(@BuilderInference block: suspend CoroutineScope.(FlowCollector<R>) -> Unit): Flow<R> =
50 flow {
Vsevolod Tolstopyatov21bb6fb2020-02-26 17:14:10 +030051 flowScope { block(this@flow) }
Vsevolod Tolstopyatove2a56712019-06-05 18:40:18 +030052 }
53
Vsevolod Tolstopyatov0342a0a2019-08-22 15:20:59 +030054internal fun <T> CoroutineScope.flowProduce(
55 context: CoroutineContext,
56 capacity: Int = 0,
57 @BuilderInference block: suspend ProducerScope<T>.() -> Unit
58): ReceiveChannel<T> {
59 val channel = Channel<T>(capacity)
60 val newContext = newCoroutineContext(context)
61 val coroutine = FlowProduceCoroutine(newContext, channel)
Vsevolod Tolstopyatov4a53d232020-03-03 15:33:40 +030062 coroutine.start(CoroutineStart.ATOMIC, coroutine, block)
Vsevolod Tolstopyatov0342a0a2019-08-22 15:20:59 +030063 return coroutine
64}
65
Vsevolod Tolstopyatove2a56712019-06-05 18:40:18 +030066private class FlowCoroutine<T>(
67 context: CoroutineContext,
68 uCont: Continuation<T>
69) : ScopeCoroutine<T>(context, uCont) {
Vsevolod Tolstopyatove2a56712019-06-05 18:40:18 +030070 public override fun childCancelled(cause: Throwable): Boolean {
71 if (cause is ChildCancelledException) return true
72 return cancelImpl(cause)
73 }
74}
Vsevolod Tolstopyatov0342a0a2019-08-22 15:20:59 +030075
76private class FlowProduceCoroutine<T>(
77 parentContext: CoroutineContext,
78 channel: Channel<T>
79) : ProducerCoroutine<T>(parentContext, channel) {
80 public override fun childCancelled(cause: Throwable): Boolean {
81 if (cause is ChildCancelledException) return true
82 return cancelImpl(cause)
83 }
84}