blob: bbdebd08b959ee7d2219a4feca8e4cc98cace861 [file] [log] [blame]
Vsevolod Tolstopyatov44c4c562019-07-25 16:17:12 +03001/*
Aurimas Liutikasc8879d62021-05-12 21:56:16 +00002 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Vsevolod Tolstopyatov44c4c562019-07-25 16:17:12 +03003 */
4@file:Suppress("UNCHECKED_CAST", "NON_APPLICABLE_CALL_FOR_BUILDER_INFERENCE") // KT-32203
5
6package kotlinx.coroutines.flow.internal
7
8import kotlinx.coroutines.*
9import kotlinx.coroutines.channels.*
10import kotlinx.coroutines.flow.*
Vsevolod Tolstopyatovd5338482019-08-06 16:08:54 +030011import kotlinx.coroutines.internal.*
Vsevolod Tolstopyatov95875902020-10-20 02:40:38 -070012import kotlin.coroutines.*
13import kotlin.coroutines.intrinsics.*
Vsevolod Tolstopyatov44c4c562019-07-25 16:17:12 +030014
Vsevolod Tolstopyatov95875902020-10-20 02:40:38 -070015private typealias Update = IndexedValue<Any?>
Vsevolod Tolstopyatov44c4c562019-07-25 16:17:12 +030016
17@PublishedApi
Vsevolod Tolstopyatovd5338482019-08-06 16:08:54 +030018internal suspend fun <R, T> FlowCollector<R>.combineInternal(
19 flows: Array<out Flow<T>>,
Vsevolod Tolstopyatov95875902020-10-20 02:40:38 -070020 arrayFactory: () -> Array<T?>?, // Array factory is required to workaround array typing on JVM
Vsevolod Tolstopyatov44c4c562019-07-25 16:17:12 +030021 transform: suspend FlowCollector<R>.(Array<T>) -> Unit
Vsevolod Tolstopyatov95875902020-10-20 02:40:38 -070022): Unit = flowScope { // flow scope so any cancellation within the source flow will cancel the whole scope
Vsevolod Tolstopyatov340d5012019-10-21 17:29:11 +030023 val size = flows.size
Vsevolod Tolstopyatov95875902020-10-20 02:40:38 -070024 if (size == 0) return@flowScope // bail-out for empty input
Vsevolod Tolstopyatov340d5012019-10-21 17:29:11 +030025 val latestValues = arrayOfNulls<Any?>(size)
Aurimas Liutikasc8879d62021-05-12 21:56:16 +000026 latestValues.fill(UNINITIALIZED) // Smaller bytecode & faster that Array(size) { UNINITIALIZED }
Vsevolod Tolstopyatov95875902020-10-20 02:40:38 -070027 val resultChannel = Channel<Update>(size)
28 val nonClosed = LocalAtomicInt(size)
29 var remainingAbsentValues = size
30 for (i in 0 until size) {
31 // Coroutine per flow that keeps track of its value and sends result to downstream
32 launch {
33 try {
34 flows[i].collect { value ->
35 resultChannel.send(Update(i, value))
36 yield() // Emulate fairness, giving each flow chance to emit
37 }
38 } finally {
39 // Close the channel when there is no more flows
40 if (nonClosed.decrementAndGet() == 0) {
41 resultChannel.close()
42 }
43 }
44 }
45 }
46
47 /*
48 * Batch-receive optimization: read updates in batches, but bail-out
49 * as soon as we encountered two values from the same source
50 */
51 val lastReceivedEpoch = ByteArray(size)
52 var currentEpoch: Byte = 0
53 while (true) {
54 ++currentEpoch
55 // Start batch
56 // The very first receive in epoch should be suspending
57 var element = resultChannel.receiveOrNull() ?: break // Channel is closed, nothing to do here
58 while (true) {
59 val index = element.index
60 // Update values
61 val previous = latestValues[index]
62 latestValues[index] = element.value
63 if (previous === UNINITIALIZED) --remainingAbsentValues
64 // Check epoch
65 // Received the second value from the same flow in the same epoch -- bail out
66 if (lastReceivedEpoch[index] == currentEpoch) break
67 lastReceivedEpoch[index] = currentEpoch
68 element = resultChannel.poll() ?: break
69 }
70
71 // Process batch result if there is enough data
72 if (remainingAbsentValues == 0) {
73 /*
74 * If arrayFactory returns null, then we can avoid array copy because
75 * it's our own safe transformer that immediately deconstructs the array
76 */
77 val results = arrayFactory()
78 if (results == null) {
79 transform(latestValues as Array<T>)
80 } else {
81 (latestValues as Array<T?>).copyInto(results)
82 transform(results as Array<T>)
83 }
84 }
85 }
86}
87
88internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
89 unsafeFlow {
90 coroutineScope {
91 val second = produce<Any> {
92 flow2.collect { value ->
93 return@collect channel.send(value ?: NULL)
94 }
95 }
96
97 /*
98 * This approach only works with rendezvous channel and is required to enforce correctness
99 * in the following scenario:
100 * ```
101 * val f1 = flow { emit(1); delay(Long.MAX_VALUE) }
102 * val f2 = flowOf(1)
103 * f1.zip(f2) { ... }
104 * ```
105 *
106 * Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
107 */
108 val collectJob = Job()
109 (second as SendChannel<*>).invokeOnClose {
110 // Optimization to avoid AFE allocation when the other flow is done
111 if (collectJob.isActive) collectJob.cancel(AbortFlowException(this@unsafeFlow))
112 }
113
114 try {
115 /*
116 * Non-trivial undispatched (because we are in the right context and there is no structured concurrency)
117 * hierarchy:
118 * -Outer coroutineScope that owns the whole zip process
119 * - First flow is collected by the child of coroutineScope, collectJob.
120 * So it can be safely cancelled as soon as the second flow is done
121 * - **But** the downstream MUST NOT be cancelled when the second flow is done,
122 * so we emit to downstream from coroutineScope job.
123 * Typically, such hierarchy requires coroutine for collector that communicates
124 * with coroutines scope via a channel, but it's way too expensive, so
125 * we are using this trick instead.
126 */
127 val scopeContext = coroutineContext
128 val cnt = threadContextElements(scopeContext)
129 withContextUndispatched(coroutineContext + collectJob, Unit) {
130 flow.collect { value ->
131 withContextUndispatched(scopeContext, Unit, cnt) {
132 val otherValue = second.receiveOrNull() ?: throw AbortFlowException(this@unsafeFlow)
133 emit(transform(value, NULL.unbox(otherValue)))
134 }
Vsevolod Tolstopyatov44c4c562019-07-25 16:17:12 +0300135 }
136 }
Vsevolod Tolstopyatov95875902020-10-20 02:40:38 -0700137 } catch (e: AbortFlowException) {
138 e.checkOwnership(owner = this@unsafeFlow)
139 } finally {
Aurimas Liutikasc8879d62021-05-12 21:56:16 +0000140 if (!second.isClosedForReceive) second.cancel()
Vsevolod Tolstopyatov44c4c562019-07-25 16:17:12 +0300141 }
142 }
143 }