blob: 28b319d80c7cc863b26494a6874f89d0d9a6314c [file] [log] [blame]
Vsevolod Tolstopyatov44c4c562019-07-25 16:17:12 +03001/*
2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4@file:Suppress("UNCHECKED_CAST", "NON_APPLICABLE_CALL_FOR_BUILDER_INFERENCE") // KT-32203
5
6package kotlinx.coroutines.flow.internal
7
8import kotlinx.coroutines.*
9import kotlinx.coroutines.channels.*
10import kotlinx.coroutines.flow.*
11import kotlinx.coroutines.internal.Symbol
12import kotlinx.coroutines.selects.*
13
14internal fun getNull(): Symbol = NULL // Workaround for JS BE bug
15
16internal suspend inline fun <T1, T2, R> FlowCollector<R>.combineTransformInternal(
17 first: Flow<T1>, second: Flow<T2>,
18 crossinline transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
19) {
20 coroutineScope {
21 val firstChannel = asFairChannel(first)
22 val secondChannel = asFairChannel(second)
23 var firstValue: Any? = null
24 var secondValue: Any? = null
25 var firstIsClosed = false
26 var secondIsClosed = false
27 while (!firstIsClosed || !secondIsClosed) {
28 select<Unit> {
29 onReceive(firstIsClosed, firstChannel, { firstIsClosed = true }) { value ->
30 firstValue = value
31 if (secondValue !== null) {
32 transform(getNull().unbox(firstValue), getNull().unbox(secondValue) as T2)
33 }
34 }
35
36 onReceive(secondIsClosed, secondChannel, { secondIsClosed = true }) { value ->
37 secondValue = value
38 if (firstValue !== null) {
39 transform(getNull().unbox(firstValue) as T1, getNull().unbox(secondValue) as T2)
40 }
41 }
42 }
43 }
44 }
45}
46
47@PublishedApi
48internal fun <T, R> combine(
49 vararg flows: Flow<T>,
50 arrayFactory: () -> Array<T?>,
51 transform: suspend FlowCollector<R>.(Array<T>) -> Unit
52): Flow<R> = flow {
53 coroutineScope {
54 val size = flows.size
55 val channels =
56 Array(size) { asFairChannel(flows[it]) }
57 val latestValues = arrayOfNulls<Any?>(size)
58 val isClosed = Array(size) { false }
59
60 // See flow.combine(other) for explanation.
61 while (!isClosed.all { it }) {
62 select<Unit> {
63 for (i in 0 until size) {
64 onReceive(isClosed[i], channels[i], { isClosed[i] = true }) { value ->
65 latestValues[i] = value
66 if (latestValues.all { it !== null }) {
67 val arguments = arrayFactory()
68 for (index in 0 until size) {
69 arguments[index] = NULL.unbox(latestValues[index])
70 }
71 transform(arguments as Array<T>)
72 }
73 }
74 }
75 }
76 }
77 }
78}
79
80private inline fun SelectBuilder<Unit>.onReceive(
81 isClosed: Boolean,
82 channel: ReceiveChannel<Any>,
83 crossinline onClosed: () -> Unit,
84 noinline onReceive: suspend (value: Any) -> Unit
85) {
86 if (isClosed) return
87 channel.onReceiveOrNull {
88 // TODO onReceiveOrClosed when boxing issues are fixed
89 if (it === null) onClosed()
90 else onReceive(it)
91 }
92}
93
94// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
95private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
96 val channel = channel as ChannelCoroutine<Any>
97 flow.collect { value ->
98 return@collect channel.sendFair(value ?: NULL)
99 }
100}
101
102internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = unsafeFlow {
103 coroutineScope {
104 val first = asChannel(flow)
105 val second = asChannel(flow2)
106 /*
107 * This approach only works with rendezvous channel and is required to enforce correctness
108 * in the following scenario:
109 * ```
110 * val f1 = flow { emit(1); delay(Long.MAX_VALUE) }
111 * val f2 = flowOf(1)
112 * f1.zip(f2) { ... }
113 * ```
114 *
115 * Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
116 */
117 (second as SendChannel<*>).invokeOnClose {
118 if (!first.isClosedForReceive) first.cancel(AbortFlowException())
119 }
120
121 val otherIterator = second.iterator()
122 try {
123 first.consumeEach { value ->
124 if (!otherIterator.hasNext()) {
125 return@consumeEach
126 }
127 emit(transform(NULL.unbox(value), NULL.unbox(otherIterator.next())))
128 }
129 } catch (e: AbortFlowException) {
130 // complete
131 } finally {
132 if (!second.isClosedForReceive) second.cancel(AbortFlowException())
133 }
134 }
135}
136
137// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
138private fun CoroutineScope.asChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
139 flow.collect { value ->
140 return@collect channel.send(value ?: NULL)
141 }
142}